Merge pull request #509 from leantli/feat/xhs_comments_upgrade

feat: xhs comments add xsec_token
This commit is contained in:
程序员阿江(Relakkes) 2024-12-03 18:34:56 +08:00 committed by GitHub
commit 9c7e1d499b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 17 deletions

View File

@ -265,11 +265,14 @@ class XiaoHongShuClient(AbstractApiClient):
) )
return dict() return dict()
async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict: async def get_note_comments(
self, note_id: str, xsec_token: str, cursor: str = ""
) -> Dict:
""" """
获取一级评论的API 获取一级评论的API
Args: Args:
note_id: 笔记ID note_id: 笔记ID
xsec_token: 验证token
cursor: 分页游标 cursor: 分页游标
Returns: Returns:
@ -281,17 +284,24 @@ class XiaoHongShuClient(AbstractApiClient):
"cursor": cursor, "cursor": cursor,
"top_comment_id": "", "top_comment_id": "",
"image_formats": "jpg,webp,avif", "image_formats": "jpg,webp,avif",
"xsec_token": xsec_token,
} }
return await self.get(uri, params) return await self.get(uri, params)
async def get_note_sub_comments( async def get_note_sub_comments(
self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = "" self,
note_id: str,
root_comment_id: str,
xsec_token: str,
num: int = 10,
cursor: str = "",
): ):
""" """
获取指定父评论下的子评论的API 获取指定父评论下的子评论的API
Args: Args:
note_id: 子评论的帖子ID note_id: 子评论的帖子ID
root_comment_id: 根评论ID root_comment_id: 根评论ID
xsec_token: 验证token
num: 分页数量 num: 分页数量
cursor: 分页游标 cursor: 分页游标
@ -304,12 +314,16 @@ class XiaoHongShuClient(AbstractApiClient):
"root_comment_id": root_comment_id, "root_comment_id": root_comment_id,
"num": num, "num": num,
"cursor": cursor, "cursor": cursor,
"image_formats": "jpg,webp,avif",
"top_comment_id": "",
"xsec_token": xsec_token,
} }
return await self.get(uri, params) return await self.get(uri, params)
async def get_note_all_comments( async def get_note_all_comments(
self, self,
note_id: str, note_id: str,
xsec_token: str,
crawl_interval: float = 1.0, crawl_interval: float = 1.0,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 10, max_count: int = 10,
@ -318,6 +332,7 @@ class XiaoHongShuClient(AbstractApiClient):
获取指定笔记下的所有一级评论该方法会一直查找一个帖子下的所有评论信息 获取指定笔记下的所有一级评论该方法会一直查找一个帖子下的所有评论信息
Args: Args:
note_id: 笔记ID note_id: 笔记ID
xsec_token: 验证token
crawl_interval: 爬取一次笔记的延迟单位 crawl_interval: 爬取一次笔记的延迟单位
callback: 一次笔记爬取结束后 callback: 一次笔记爬取结束后
max_count: 一次笔记爬取的最大评论数量 max_count: 一次笔记爬取的最大评论数量
@ -328,7 +343,9 @@ class XiaoHongShuClient(AbstractApiClient):
comments_has_more = True comments_has_more = True
comments_cursor = "" comments_cursor = ""
while comments_has_more and len(result) < max_count: while comments_has_more and len(result) < max_count:
comments_res = await self.get_note_comments(note_id, comments_cursor) comments_res = await self.get_note_comments(
note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor
)
comments_has_more = comments_res.get("has_more", False) comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "") comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res: if "comments" not in comments_res:
@ -344,7 +361,10 @@ class XiaoHongShuClient(AbstractApiClient):
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
result.extend(comments) result.extend(comments)
sub_comments = await self.get_comments_all_sub_comments( sub_comments = await self.get_comments_all_sub_comments(
comments, crawl_interval, callback comments=comments,
xsec_token=xsec_token,
crawl_interval=crawl_interval,
callback=callback,
) )
result.extend(sub_comments) result.extend(sub_comments)
return result return result
@ -352,6 +372,7 @@ class XiaoHongShuClient(AbstractApiClient):
async def get_comments_all_sub_comments( async def get_comments_all_sub_comments(
self, self,
comments: List[Dict], comments: List[Dict],
xsec_token: str,
crawl_interval: float = 1.0, crawl_interval: float = 1.0,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
) -> List[Dict]: ) -> List[Dict]:
@ -359,6 +380,7 @@ class XiaoHongShuClient(AbstractApiClient):
获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
Args: Args:
comments: 评论列表 comments: 评论列表
xsec_token: 验证token
crawl_interval: 爬取一次评论的延迟单位 crawl_interval: 爬取一次评论的延迟单位
callback: 一次评论爬取结束后 callback: 一次评论爬取结束后
@ -387,7 +409,11 @@ class XiaoHongShuClient(AbstractApiClient):
while sub_comment_has_more: while sub_comment_has_more:
comments_res = await self.get_note_sub_comments( comments_res = await self.get_note_sub_comments(
note_id, root_comment_id, 10, sub_comment_cursor note_id=note_id,
root_comment_id=root_comment_id,
xsec_token=xsec_token,
num=10,
cursor=sub_comment_cursor,
) )
sub_comment_has_more = comments_res.get("has_more", False) sub_comment_has_more = comments_res.get("has_more", False)
sub_comment_cursor = comments_res.get("cursor", "") sub_comment_cursor = comments_res.get("cursor", "")

View File

@ -135,7 +135,8 @@ class XiaoHongShuCrawler(AbstractCrawler):
utils.logger.info( utils.logger.info(
f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}" f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}"
) )
note_id_list: List[str] = [] note_ids: List[str] = []
xsec_tokens: List[str] = []
notes_res = await self.xhs_client.get_note_by_keyword( notes_res = await self.xhs_client.get_note_by_keyword(
keyword=keyword, keyword=keyword,
search_id=search_id, search_id=search_id,
@ -168,12 +169,13 @@ class XiaoHongShuCrawler(AbstractCrawler):
if note_detail: if note_detail:
await xhs_store.update_xhs_note(note_detail) await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail) await self.get_notice_media(note_detail)
note_id_list.append(note_detail.get("note_id")) note_ids.append(note_detail.get("note_id"))
xsec_tokens.append(note_detail.get("xsec_token"))
page += 1 page += 1
utils.logger.info( utils.logger.info(
f"[XiaoHongShuCrawler.search] Note details: {note_details}" f"[XiaoHongShuCrawler.search] Note details: {note_details}"
) )
await self.batch_get_note_comments(note_id_list) await self.batch_get_note_comments(note_ids, xsec_tokens)
except DataFetchError: except DataFetchError:
utils.logger.error( utils.logger.error(
"[XiaoHongShuCrawler.search] Get note detail error" "[XiaoHongShuCrawler.search] Get note detail error"
@ -200,8 +202,12 @@ class XiaoHongShuCrawler(AbstractCrawler):
callback=self.fetch_creator_notes_detail, callback=self.fetch_creator_notes_detail,
) )
note_ids = [note_item.get("note_id") for note_item in all_notes_list] note_ids = []
await self.batch_get_note_comments(note_ids) xsec_tokens = []
for note_item in all_notes_list:
note_ids.append(note_item.get("note_id"))
xsec_tokens.append(note_item.get("xsec_token"))
await self.batch_get_note_comments(note_ids, xsec_tokens)
async def fetch_creator_notes_detail(self, note_list: List[Dict]): async def fetch_creator_notes_detail(self, note_list: List[Dict]):
""" """
@ -245,12 +251,14 @@ class XiaoHongShuCrawler(AbstractCrawler):
get_note_detail_task_list.append(crawler_task) get_note_detail_task_list.append(crawler_task)
need_get_comment_note_ids = [] need_get_comment_note_ids = []
xsec_tokens = []
note_details = await asyncio.gather(*get_note_detail_task_list) note_details = await asyncio.gather(*get_note_detail_task_list)
for note_detail in note_details: for note_detail in note_details:
if note_detail: if note_detail:
need_get_comment_note_ids.append(note_detail.get("note_id", "")) need_get_comment_note_ids.append(note_detail.get("note_id", ""))
xsec_tokens.append(note_detail.get("xsec_token", ""))
await xhs_store.update_xhs_note(note_detail) await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(need_get_comment_note_ids) await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens)
async def get_note_detail_async_task( async def get_note_detail_async_task(
self, self,
@ -291,8 +299,10 @@ class XiaoHongShuCrawler(AbstractCrawler):
) )
if not note_detail_from_html: if not note_detail_from_html:
# 如果网页版笔记详情获取失败则尝试API获取 # 如果网页版笔记详情获取失败则尝试API获取
note_detail_from_api: Optional[Dict] = await self.xhs_client.get_note_by_id( note_detail_from_api: Optional[Dict] = (
note_id, xsec_source, xsec_token await self.xhs_client.get_note_by_id(
note_id, xsec_source, xsec_token
)
) )
note_detail = note_detail_from_html or note_detail_from_api note_detail = note_detail_from_html or note_detail_from_api
if note_detail: if note_detail:
@ -311,7 +321,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
) )
return None return None
async def batch_get_note_comments(self, note_list: List[str]): async def batch_get_note_comments(
self, note_list: List[str], xsec_tokens: List[str]
):
"""Batch get note comments""" """Batch get note comments"""
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(
@ -324,14 +336,19 @@ class XiaoHongShuCrawler(AbstractCrawler):
) )
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for note_id in note_list: for index, note_id in enumerate(note_list):
task = asyncio.create_task( task = asyncio.create_task(
self.get_comments(note_id, semaphore), name=note_id self.get_comments(
note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore
),
name=note_id,
) )
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): async def get_comments(
self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore
):
"""Get note comments with keyword filtering and quantity limitation""" """Get note comments with keyword filtering and quantity limitation"""
async with semaphore: async with semaphore:
utils.logger.info( utils.logger.info(
@ -339,6 +356,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
) )
await self.xhs_client.get_note_all_comments( await self.xhs_client.get_note_all_comments(
note_id=note_id, note_id=note_id,
xsec_token=xsec_token,
crawl_interval=random.random(), crawl_interval=random.random(),
callback=xhs_store.batch_update_xhs_note_comments, callback=xhs_store.batch_update_xhs_note_comments,
max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES, max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,