import json import datetime from databases import Database from typing import Dict, List, Any # 数据库连接配置 db_config = { 'user': 'root', 'password': 'zaq12wsx@9Xin', 'host': '183.11.229.79', 'port': 3316, 'database': 'gptDB', 'auth_plugin': 'mysql_native_password' } DATABASE_URL = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?auth_plugin={db_config['auth_plugin']}" database = Database(DATABASE_URL) class SearchResultParser: def __init__(self): self.parsed_results = [] async def parse_google_results(self, results: Dict) -> List[Dict]: """解析Google搜索结果""" parsed = [] # 解析organic_results if 'organic_results' in results: for result in results['organic_results']: parsed.append({ 'title': result.get('title', ''), 'link': result.get('link', ''), 'snippet': result.get('snippet', ''), 'position': result.get('position', 0), 'type': 'organic' }) # 解析answer_box如果存在 if 'answer_box' in results: answer = results['answer_box'] parsed.append({ 'title': answer.get('title', ''), 'answer': answer.get('answer', ''), 'type': 'answer_box' }) return parsed async def parse_bing_results(self, results: Dict) -> List[Dict]: """解析Bing搜索结果""" parsed = [] if 'organic_results' in results: for result in results['organic_results']: parsed.append({ 'title': result.get('title', ''), 'link': result.get('link', ''), 'snippet': result.get('snippet', ''), 'position': result.get('position', 0), 'type': 'organic' }) return parsed async def parse_baidu_results(self, results: Dict) -> List[Dict]: """解析百度搜索结果""" parsed = [] if 'organic_results' in results: for result in results['organic_results']: parsed.append({ 'title': result.get('title', ''), 'link': result.get('link', ''), 'snippet': result.get('snippet', ''), 'position': result.get('position', 0), 'type': 'organic' }) # 解析百度特有的answer_box if 'answer_box' in results: for answer in results['answer_box']: parsed.append({ 'answer': answer.get('answer', ''), 'snippet': answer.get('snippet', ''), 'source': answer.get('source', ''), 'type': 'answer_box' }) return parsed async def process_search_results(self): """处理数据库中未处理的搜索结果""" await database.connect() # 获取未处理的搜索结果 query = """ SELECT id, engine, query, results FROM web_search_results WHERE is_processed = 0 """ unprocessed_results = await database.fetch_all(query) for record in unprocessed_results: results = json.loads(record['results']) engine = record['engine'] # 根据不同搜索引擎选择相应的解析方法 if engine == 'google': parsed_data = await self.parse_google_results(results) elif engine == 'bing': parsed_data = await self.parse_bing_results(results) elif engine == 'baidu': parsed_data = await self.parse_baidu_results(results) # 将解析后的结果存入新表 for item in parsed_data: await database.execute( """ INSERT INTO parsed_search_results (original_search_id, engine, query, title, link, snippet, result_type, parsed_datetime) VALUES (:original_search_id, :engine, :query, :title, :link, :snippet, :result_type, :parsed_datetime) """, { 'original_search_id': record['id'], 'engine': engine, 'query': record['query'], 'title': item.get('title', ''), 'link': item.get('link', ''), 'snippet': item.get('snippet', ''), 'result_type': item.get('type', ''), 'parsed_datetime': datetime.datetime.now() } ) # 更新原记录为已处理 await database.execute( """ UPDATE web_search_results SET is_processed = 1 WHERE id = :id """, {'id': record['id']} ) await database.disconnect() async def main(): parser = SearchResultParser() await parser.process_search_results() if __name__ == "__main__": import asyncio asyncio.run(main())