web_search/search_plus.py

90 lines
2.7 KiB
Python
Raw Permalink Normal View History

2024-12-19 03:32:54 +00:00
import json
import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from databases import Database
import requests
import json
# 数据库连接配置
db_config = {
'user': 'root',
'password': 'zaq12wsx@9Xin',
'host': '183.11.229.79',
'port': 3316,
'database': 'gptDB',
'auth_plugin': 'mysql_native_password'
}
# 创建数据库 URL
DATABASE_URL = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?auth_plugin={db_config['auth_plugin']}"
database = Database(DATABASE_URL)
SERP_API_KEY = "8af097ae8b587bb0569425058e03e5ef33b4c7b8b1a505053764b62e7e4ab9d6"
async def search_and_save(query):
"""
执行单个查询的搜索并保存结果
参数:
query (str): 要搜索的查询
返回:
list: 搜索结果列表
"""
search_results = []
await database.connect()
for engine in ["google", "bing", "baidu"]:
params = {
"api_key": SERP_API_KEY,
"engine": engine,
"q": query
}
response = requests.get('https://serpapi.com/search', params=params)
search = response.json()
search_metadata = search.get('search_metadata', {})
if search_metadata.get('status') == 'Success':
json_endpoint = search_metadata.get('json_endpoint')
response = requests.get(json_endpoint)
if response.status_code == 200:
data = response.json()
json_record_str = json.dumps(data)
# 保存到数据库
query_ = """
INSERT INTO web_search_results (engine, query, results, search_datetime, is_processed)
VALUES (:engine, :query, :results, :search_datetime, :is_processed)
"""
values = {
"engine": engine,
"query": query,
"results": json_record_str,
"search_datetime": datetime.datetime.now(),
"is_processed": 0
}
await database.execute(query_, values)
search_results.append(json_record_str)
await database.disconnect()
return search_results
async def main(queries):
"""
执行批量搜索查询
参数:
queries (list): 要搜索的查询列表
"""
for query in queries:
await search_and_save(query)
if __name__ == "__main__":
example_queries = [
"化妆品 面部护理",
"化妆品 眼部护理",
"化妆品 面部清洁"
]
import asyncio
asyncio.run(main(example_queries))