154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
import json
|
|
import datetime
|
|
from databases import Database
|
|
from typing import Dict, List, Any
|
|
|
|
# 数据库连接配置
|
|
db_config = {
|
|
'user': 'root',
|
|
'password': 'zaq12wsx@9Xin',
|
|
'host': '183.11.229.79',
|
|
'port': 3316,
|
|
'database': 'gptDB',
|
|
'auth_plugin': 'mysql_native_password'
|
|
}
|
|
|
|
DATABASE_URL = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?auth_plugin={db_config['auth_plugin']}"
|
|
|
|
database = Database(DATABASE_URL)
|
|
|
|
class SearchResultParser:
|
|
def __init__(self):
|
|
self.parsed_results = []
|
|
|
|
async def parse_google_results(self, results: Dict) -> List[Dict]:
|
|
"""解析Google搜索结果"""
|
|
parsed = []
|
|
|
|
# 解析organic_results
|
|
if 'organic_results' in results:
|
|
for result in results['organic_results']:
|
|
parsed.append({
|
|
'title': result.get('title', ''),
|
|
'link': result.get('link', ''),
|
|
'snippet': result.get('snippet', ''),
|
|
'position': result.get('position', 0),
|
|
'type': 'organic'
|
|
})
|
|
|
|
# 解析answer_box如果存在
|
|
if 'answer_box' in results:
|
|
answer = results['answer_box']
|
|
parsed.append({
|
|
'title': answer.get('title', ''),
|
|
'answer': answer.get('answer', ''),
|
|
'type': 'answer_box'
|
|
})
|
|
|
|
return parsed
|
|
|
|
async def parse_bing_results(self, results: Dict) -> List[Dict]:
|
|
"""解析Bing搜索结果"""
|
|
parsed = []
|
|
|
|
if 'organic_results' in results:
|
|
for result in results['organic_results']:
|
|
parsed.append({
|
|
'title': result.get('title', ''),
|
|
'link': result.get('link', ''),
|
|
'snippet': result.get('snippet', ''),
|
|
'position': result.get('position', 0),
|
|
'type': 'organic'
|
|
})
|
|
|
|
return parsed
|
|
|
|
async def parse_baidu_results(self, results: Dict) -> List[Dict]:
|
|
"""解析百度搜索结果"""
|
|
parsed = []
|
|
|
|
if 'organic_results' in results:
|
|
for result in results['organic_results']:
|
|
parsed.append({
|
|
'title': result.get('title', ''),
|
|
'link': result.get('link', ''),
|
|
'snippet': result.get('snippet', ''),
|
|
'position': result.get('position', 0),
|
|
'type': 'organic'
|
|
})
|
|
|
|
# 解析百度特有的answer_box
|
|
if 'answer_box' in results:
|
|
for answer in results['answer_box']:
|
|
parsed.append({
|
|
'answer': answer.get('answer', ''),
|
|
'snippet': answer.get('snippet', ''),
|
|
'source': answer.get('source', ''),
|
|
'type': 'answer_box'
|
|
})
|
|
|
|
return parsed
|
|
|
|
async def process_search_results(self):
|
|
"""处理数据库中未处理的搜索结果"""
|
|
await database.connect()
|
|
|
|
# 获取未处理的搜索结果
|
|
query = """
|
|
SELECT id, engine, query, results
|
|
FROM web_search_results
|
|
WHERE is_processed = 0
|
|
"""
|
|
unprocessed_results = await database.fetch_all(query)
|
|
|
|
for record in unprocessed_results:
|
|
results = json.loads(record['results'])
|
|
engine = record['engine']
|
|
|
|
# 根据不同搜索引擎选择相应的解析方法
|
|
if engine == 'google':
|
|
parsed_data = await self.parse_google_results(results)
|
|
elif engine == 'bing':
|
|
parsed_data = await self.parse_bing_results(results)
|
|
elif engine == 'baidu':
|
|
parsed_data = await self.parse_baidu_results(results)
|
|
|
|
# 将解析后的结果存入新表
|
|
for item in parsed_data:
|
|
await database.execute(
|
|
"""
|
|
INSERT INTO parsed_search_results
|
|
(original_search_id, engine, query, title, link, snippet, result_type, parsed_datetime)
|
|
VALUES (:original_search_id, :engine, :query, :title, :link, :snippet, :result_type, :parsed_datetime)
|
|
""",
|
|
{
|
|
'original_search_id': record['id'],
|
|
'engine': engine,
|
|
'query': record['query'],
|
|
'title': item.get('title', ''),
|
|
'link': item.get('link', ''),
|
|
'snippet': item.get('snippet', ''),
|
|
'result_type': item.get('type', ''),
|
|
'parsed_datetime': datetime.datetime.now()
|
|
}
|
|
)
|
|
|
|
# 更新原记录为已处理
|
|
await database.execute(
|
|
"""
|
|
UPDATE web_search_results
|
|
SET is_processed = 1
|
|
WHERE id = :id
|
|
""",
|
|
{'id': record['id']}
|
|
)
|
|
|
|
await database.disconnect()
|
|
|
|
async def main():
|
|
parser = SearchResultParser()
|
|
await parser.process_search_results()
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
asyncio.run(main()) |