web_search/search_queries.py
2024-12-19 11:32:54 +08:00

82 lines
2.8 KiB
Python

import json
import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from databases import Database
import requests
import json
# 数据库连接配置
db_config = {
'user': 'root',
'password': 'zaq12wsx@9Xin',
'host': '183.11.229.79',
'port': 3316,
'database': 'gptDB',
'auth_plugin': 'mysql_native_password'
}
# 创建数据库 URL
DATABASE_URL = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?auth_plugin={db_config['auth_plugin']}"
database = Database(DATABASE_URL)
SERP_API_KEY = "8af097ae8b587bb0569425058e03e5ef33b4c7b8b1a505053764b62e7e4ab9d6"
queries = [
"小红书化妆品 面部护理",
"小红书化妆品 眼部护理",
"小红书化妆品 面部清洁",
"小红书化妆品 防晒及晒后护理",
"小红书化妆品 体毛处理",
"小红书化妆品 洗发护发",
"小红书化妆品 洗涤",
"小红书化妆品 面部清洁",
"小红书化妆品 精油",
"小红书化妆品 身体护理",
"小红书化妆品 头皮洗护",
"小红书化妆品 洗发护发",
"小红书化妆品 洗浴产品",
"小红书化妆品 婴童洗护",
"小红书化妆品 婴童护肤"
]
async def main():
await database.connect()
for query in queries:
for engine in ["google", "bing", "baidu"]:
params = {
"api_key": SERP_API_KEY,
"engine": engine,
"q": query
}
response = requests.get('https://serpapi.com/search', params=params)
search = response.json()
search_metadata = search.get('search_metadata', {})
if search_metadata.get('status') == 'Success':
json_endpoint = search_metadata.get('json_endpoint')
# 检查请求是否成功
response = requests.get(json_endpoint)
if response.status_code == 200:
# 解析JSON数据
data = response.json()
json_record_str = json.dumps(data) # 将JSON数据转换为字符串
# 保存到数据库
query_ = """
INSERT INTO web_search_results (engine, query, results, search_datetime, is_processed)
VALUES (:engine, :query, :results, :search_datetime, :is_processed)
"""
values = {
"engine": engine,
"query": query,
"results": json_record_str,
"search_datetime": datetime.datetime.now(),
"is_processed":0
}
await database.execute(query_, values)
await database.disconnect()
import asyncio
asyncio.run(main())