xhs_crawler/integrate_xhs_crawler.py
phezzan f74ad7926e
Some checks failed
Deploy VitePress site to Pages / build (push) Has been cancelled
Deploy VitePress site to Pages / Deploy (push) Has been cancelled
first commit
2024-12-17 16:14:10 +08:00

231 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
from data.xhs.json.import_xhs_notes import connect_to_database, create_table, check_record_exists
# from check_downloads import check_media_files
from mysql.connector import Error
import time
import asyncio
import subprocess
from datetime import datetime
import random
async def _run_crawler(keyword):
"""运行爬虫的异步实现"""
try:
process = await asyncio.create_subprocess_exec(
'python', 'main.py',
'--platform', 'xhs',
'--lt', 'qrcode',
'--keywords', keyword,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024*1024
)
# 读取输出流
async def read_stream(stream):
buffer = ""
while True:
chunk = await stream.read(8192)
if not chunk:
break
text = chunk.decode('utf-8', errors='ignore')
buffer += text
# 处理输出
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line.strip():
print(f"爬虫进度: {line.strip()}")
# 同时处理标准输出和错误输出
await asyncio.gather(
read_stream(process.stdout),
read_stream(process.stderr)
)
await process.wait()
return process.returncode == 0
except Exception as e:
print(f"爬虫执行错误: {str(e)}")
return False
def load_search_keywords():
"""从sheet_notes文件夹加载搜索关键词"""
keywords_dict = {}
json_dir = './data/xhs/json/sheet_notes'
for json_file in os.listdir(json_dir):
if not json_file.endswith('.json'):
continue
sheet_name = os.path.splitext(json_file)[0]
with open(os.path.join(json_dir, json_file), 'r', encoding='utf-8') as f:
keywords = json.load(f)
# 修护.json从第12个元素开始
# if sheet_name == '修护':
# keywords = keywords[11:]
keywords_dict[sheet_name] = keywords
return keywords_dict
def insert_note_data(connection, data, sheet_name):
"""插入笔记数据到数据库"""
insert_query = """
INSERT INTO xhs_notes (
note_id, type, title, description, video_url, time,
last_update_time, user_id, nickname, avatar,
liked_count, collected_count, comment_count, share_count,
ip_location, image_list, tag_list, last_modify_ts,
note_url, source_keyword, sheet_name, download_flag
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
try:
cursor = connection.cursor()
inserted_count = 0
skipped_count = 0
for item in data:
note_id = item.get('note_id')
# 检查记录是否已存在
if check_record_exists(cursor, note_id):
skipped_count += 1
continue
values = (
note_id,
item.get('type'),
item.get('title'),
item.get('desc'),
item.get('video_url'),
item.get('time'),
item.get('last_update_time'),
item.get('user_id'),
item.get('nickname'),
item.get('avatar'),
item.get('liked_count'),
item.get('collected_count'),
item.get('comment_count'),
item.get('share_count'),
item.get('ip_location'),
item.get('image_list'),
item.get('tag_list'),
item.get('last_modify_ts'),
item.get('note_url'),
item.get('source_keyword'),
sheet_name,
False # download_flag 默认为False
)
cursor.execute(insert_query, values)
inserted_count += 1
connection.commit()
print(f'成功插入 {inserted_count} 条新数据')
print(f'跳过 {skipped_count} 条已存在的数据')
except Error as e:
print(f'插入数据时出错: {e}')
connection.rollback()
def search_xhs_notes(keyword):
"""搜索小红书笔记"""
try:
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 运行爬虫
success = loop.run_until_complete(_run_crawler(keyword))
if not success:
print(f"爬虫执行失败: {keyword}")
return None
# 读取爬虫结果
json_path = f'./data/xhs/json/search_contents_{datetime.now().strftime("%Y-%m-%d")}.json'
if not os.path.exists(json_path):
print(f"找不到爬虫结果文<EFBFBD><EFBFBD>: {json_path}")
return None
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 为每条记录添加来源关键词
for item in data:
item['source_keyword'] = keyword
return data
finally:
loop.close()
except Exception as e:
print(f"搜索过程发生错误: {str(e)}")
return None
def keyword_exists_in_db(connection, keyword):
"""检查关键词是否已存在于数据库中"""
query = "SELECT COUNT(*) FROM xhs_notes WHERE source_keyword = %s"
cursor = connection.cursor()
cursor.execute(query, (keyword,))
result = cursor.fetchone()
return result[0] > 0
def main():
# 连接数据库
connection = connect_to_database()
if connection is None:
return
try:
# 创建表格
create_table(connection)
# 加载搜索关键词
keywords_dict = load_search_keywords()
# 对每个sheet的关键词进行搜索
for sheet_name, keywords in keywords_dict.items():
print(f'开始处理 {sheet_name} 的关键词...')
for keyword in keywords:
# 检查关键词是否已存在
if keyword_exists_in_db(connection, keyword):
print(f'关键词已存在,跳过: {keyword}')
continue
print(f'搜索关键词: {keyword}')
# 搜索小红书笔记
search_results = search_xhs_notes(keyword)
if search_results:
# 将搜索结果保存到数据库
insert_note_data(connection, search_results, sheet_name)
else:
print(f"未获取到搜索结果: {keyword}")
# 添加延时避免请求过快随机延时10-30秒
time.sleep(random.uniform(10, 30))
print(f'{sheet_name} 的关键词处理完成')
# 下载所有媒体文件
# print('开始下载媒体文件...')
# check_media_files()
except Exception as e:
print(f'处理过程中出错: {e}')
finally:
if connection.is_connected():
connection.close()
print('数据库连接已关闭')
if __name__ == "__main__":
main()