xhs_crawler/integrate_xhs_crawler.py

231 lines
7.5 KiB
Python
Raw Normal View History

2024-12-17 08:14:10 +00:00
import json
import os
from data.xhs.json.import_xhs_notes import connect_to_database, create_table, check_record_exists
# from check_downloads import check_media_files
from mysql.connector import Error
import time
import asyncio
import subprocess
from datetime import datetime
import random
async def _run_crawler(keyword):
"""运行爬虫的异步实现"""
try:
process = await asyncio.create_subprocess_exec(
'python', 'main.py',
'--platform', 'xhs',
'--lt', 'qrcode',
'--keywords', keyword,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024*1024
)
# 读取输出流
async def read_stream(stream):
buffer = ""
while True:
chunk = await stream.read(8192)
if not chunk:
break
text = chunk.decode('utf-8', errors='ignore')
buffer += text
# 处理输出
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line.strip():
print(f"爬虫进度: {line.strip()}")
# 同时处理标准输出和错误输出
await asyncio.gather(
read_stream(process.stdout),
read_stream(process.stderr)
)
await process.wait()
return process.returncode == 0
except Exception as e:
print(f"爬虫执行错误: {str(e)}")
return False
def load_search_keywords():
"""从sheet_notes文件夹加载搜索关键词"""
keywords_dict = {}
json_dir = './data/xhs/json/sheet_notes'
for json_file in os.listdir(json_dir):
if not json_file.endswith('.json'):
continue
sheet_name = os.path.splitext(json_file)[0]
with open(os.path.join(json_dir, json_file), 'r', encoding='utf-8') as f:
keywords = json.load(f)
# 修护.json从第12个元素开始
# if sheet_name == '修护':
# keywords = keywords[11:]
keywords_dict[sheet_name] = keywords
return keywords_dict
def insert_note_data(connection, data, sheet_name):
"""插入笔记数据到数据库"""
insert_query = """
INSERT INTO xhs_notes (
note_id, type, title, description, video_url, time,
last_update_time, user_id, nickname, avatar,
liked_count, collected_count, comment_count, share_count,
ip_location, image_list, tag_list, last_modify_ts,
note_url, source_keyword, sheet_name, download_flag
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
try:
cursor = connection.cursor()
inserted_count = 0
skipped_count = 0
for item in data:
note_id = item.get('note_id')
# 检查记录是否已存在
if check_record_exists(cursor, note_id):
skipped_count += 1
continue
values = (
note_id,
item.get('type'),
item.get('title'),
item.get('desc'),
item.get('video_url'),
item.get('time'),
item.get('last_update_time'),
item.get('user_id'),
item.get('nickname'),
item.get('avatar'),
item.get('liked_count'),
item.get('collected_count'),
item.get('comment_count'),
item.get('share_count'),
item.get('ip_location'),
item.get('image_list'),
item.get('tag_list'),
item.get('last_modify_ts'),
item.get('note_url'),
item.get('source_keyword'),
sheet_name,
False # download_flag 默认为False
)
cursor.execute(insert_query, values)
inserted_count += 1
connection.commit()
print(f'成功插入 {inserted_count} 条新数据')
print(f'跳过 {skipped_count} 条已存在的数据')
except Error as e:
print(f'插入数据时出错: {e}')
connection.rollback()
def search_xhs_notes(keyword):
"""搜索小红书笔记"""
try:
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 运行爬虫
success = loop.run_until_complete(_run_crawler(keyword))
if not success:
print(f"爬虫执行失败: {keyword}")
return None
# 读取爬虫结果
json_path = f'./data/xhs/json/search_contents_{datetime.now().strftime("%Y-%m-%d")}.json'
if not os.path.exists(json_path):
print(f"找不到爬虫结果文<EFBFBD><EFBFBD>: {json_path}")
return None
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 为每条记录添加来源关键词
for item in data:
item['source_keyword'] = keyword
return data
finally:
loop.close()
except Exception as e:
print(f"搜索过程发生错误: {str(e)}")
return None
def keyword_exists_in_db(connection, keyword):
"""检查关键词是否已存在于数据库中"""
query = "SELECT COUNT(*) FROM xhs_notes WHERE source_keyword = %s"
cursor = connection.cursor()
cursor.execute(query, (keyword,))
result = cursor.fetchone()
return result[0] > 0
def main():
# 连接数据库
connection = connect_to_database()
if connection is None:
return
try:
# 创建表格
create_table(connection)
# 加载搜索关键词
keywords_dict = load_search_keywords()
# 对每个sheet的关键词进行搜索
for sheet_name, keywords in keywords_dict.items():
print(f'开始处理 {sheet_name} 的关键词...')
for keyword in keywords:
# 检查关键词是否已存在
if keyword_exists_in_db(connection, keyword):
print(f'关键词已存在,跳过: {keyword}')
continue
print(f'搜索关键词: {keyword}')
# 搜索小红书笔记
search_results = search_xhs_notes(keyword)
if search_results:
# 将搜索结果保存到数据库
insert_note_data(connection, search_results, sheet_name)
else:
print(f"未获取到搜索结果: {keyword}")
# 添加延时避免请求过快随机延时10-30秒
time.sleep(random.uniform(10, 30))
print(f'{sheet_name} 的关键词处理完成')
# 下载所有媒体文件
# print('开始下载媒体文件...')
# check_media_files()
except Exception as e:
print(f'处理过程中出错: {e}')
finally:
if connection.is_connected():
connection.close()
print('数据库连接已关闭')
if __name__ == "__main__":
main()