feat: 小红书支持通过博主ID采集笔记和评论,小红书type=search时支持配置按哪种排序方式获取笔记数据,小红书笔记增加视频地址和标签字段
This commit is contained in:
parent
c09f9fef68
commit
61ba8c5cc7
@ -49,3 +49,9 @@ class AbstractStore(ABC):
|
||||
@abstractmethod
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
pass
|
||||
|
||||
# TODO support all platform
|
||||
# only xhs is supported, so @abstractmethod is commented
|
||||
# @abstractmethod
|
||||
async def store_creator(self, creator: Dict):
|
||||
pass
|
||||
|
@ -3,6 +3,7 @@ PLATFORM = "xhs"
|
||||
KEYWORDS = "python,golang"
|
||||
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
|
||||
COOKIES = ""
|
||||
SORT_TYPE="popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书
|
||||
CRAWLER_TYPE = "search"
|
||||
|
||||
# 是否开启 IP 代理
|
||||
@ -70,3 +71,11 @@ WEIBO_SPECIFIED_ID_LIST = [
|
||||
"4982041758140155",
|
||||
# ........................
|
||||
]
|
||||
|
||||
# 指定小红书创作者ID列表
|
||||
XHS_CREATOR_ID_LIST = [
|
||||
"59d8cb33de5fb4696bf17217",
|
||||
"61b87386000000001000b18b",
|
||||
"5e8558100000000001005bc5",
|
||||
# ........................
|
||||
]
|
4
main.py
4
main.py
@ -36,8 +36,8 @@ async def main():
|
||||
choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
|
||||
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
|
||||
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
|
||||
parser.add_argument('--type', type=str, help='crawler type (search | detail)',
|
||||
choices=["search", "detail"], default=config.CRAWLER_TYPE)
|
||||
parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',
|
||||
choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)
|
||||
|
||||
# init db
|
||||
if config.SAVE_DATA_OPTION == "db":
|
||||
|
@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from typing import Callable, Dict, List, Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@ -73,11 +74,18 @@ class XHSClient:
|
||||
Returns:
|
||||
|
||||
"""
|
||||
# return response.text
|
||||
return_response = kwargs.pop('return_response', False)
|
||||
|
||||
async with httpx.AsyncClient(proxies=self.proxies) as client:
|
||||
response = await client.request(
|
||||
method, url, timeout=self.timeout,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
if return_response:
|
||||
return response.text
|
||||
|
||||
data: Dict = response.json()
|
||||
if data["success"]:
|
||||
return data.get("data", data.get("success", {}))
|
||||
@ -178,6 +186,56 @@ class XHSClient:
|
||||
}
|
||||
return await self.post(uri, data)
|
||||
|
||||
async def get_creator_info_and_notes(self, creator: str) -> Dict:
|
||||
"""
|
||||
获取博主的信息和第一页的笔记
|
||||
Args:
|
||||
creator: 博主ID
|
||||
Returns:
|
||||
{"creator":{}, "notes":[]}
|
||||
"""
|
||||
path = '/user/profile/'+creator
|
||||
content = await self.request(method="GET", url=f"https://www.xiaohongshu.com{path}", return_response=True)
|
||||
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', content, re.M)
|
||||
|
||||
if match == None:
|
||||
return {}
|
||||
|
||||
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
|
||||
if info == None:
|
||||
return {}
|
||||
|
||||
return {
|
||||
'creator': info.get('user').get('userPageData'),
|
||||
'notes': info.get('user').get('notes')[0],
|
||||
'cursor': info.get('user').get('noteQueries')[0].get('cursor'),
|
||||
'has_more_notes': info.get('user').get('noteQueries')[0].get('hasMore')
|
||||
}
|
||||
|
||||
async def get_notes_by_creator(
|
||||
self, creator: str,
|
||||
cursor: str,
|
||||
page_size: int = 30
|
||||
) -> Dict:
|
||||
"""
|
||||
获取博主的笔记
|
||||
Args:
|
||||
creator: 博主ID
|
||||
cursor: 上一页最后一条笔记的ID
|
||||
page_size: 分页数据长度
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
uri = "/api/sns/web/v1/user_posted"
|
||||
data = {
|
||||
"user_id": creator,
|
||||
"cursor": cursor,
|
||||
"num": page_size,
|
||||
"image_formats": "jpg,webp,avif"
|
||||
}
|
||||
return await self.get(uri, data)
|
||||
|
||||
async def get_note_by_id(self, note_id: str) -> Dict:
|
||||
"""
|
||||
获取笔记详情API
|
||||
|
@ -16,6 +16,7 @@ from var import crawler_type_var
|
||||
|
||||
from .client import XHSClient
|
||||
from .exception import DataFetchError
|
||||
from .field import SearchSortType
|
||||
from .login import XHSLogin
|
||||
|
||||
|
||||
@ -84,6 +85,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
|
||||
elif self.crawler_type == "detail":
|
||||
# Get the information and comments of the specified post
|
||||
await self.get_specified_notes()
|
||||
elif self.crawler_type == "creator":
|
||||
# Get creator's information and their notes and comments
|
||||
await self.get_creators_and_notes()
|
||||
else:
|
||||
pass
|
||||
|
||||
@ -101,6 +105,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
|
||||
notes_res = await self.xhs_client.get_note_by_keyword(
|
||||
keyword=keyword,
|
||||
page=page,
|
||||
sort=SearchSortType(config.SORT_TYPE) if config.SORT_TYPE!='' else SearchSortType.GENERAL,
|
||||
)
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}")
|
||||
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
|
||||
@ -117,6 +122,67 @@ class XiaoHongShuCrawler(AbstractCrawler):
|
||||
page += 1
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
|
||||
await self.batch_get_note_comments(note_id_list)
|
||||
|
||||
async def get_creators_and_notes(self) -> None:
|
||||
"""Get creator's notes and retrieve their comment information."""
|
||||
utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
|
||||
xhs_limit_count = 30
|
||||
for creator in config.XHS_CREATOR_ID_LIST:
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Current creator: {creator}")
|
||||
page = 0
|
||||
cursor = ''
|
||||
has_more_notes = False
|
||||
while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
|
||||
note_id_list: List[str] = []
|
||||
|
||||
if page == 0:
|
||||
# get creator info and notes
|
||||
creator_and_notes_info = await self.xhs_client.get_creator_info_and_notes(creator)
|
||||
|
||||
if creator_and_notes_info == None or not creator_and_notes_info:
|
||||
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator notes error")
|
||||
continue
|
||||
|
||||
notes_res = creator_and_notes_info.get('notes')
|
||||
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator and notes:{notes_res}")
|
||||
|
||||
cursor = creator_and_notes_info.get('cursor')
|
||||
has_more_notes = creator_and_notes_info.get('has_more_notes')
|
||||
|
||||
# save creator info
|
||||
await xhs_store.save_creator(creator, creator_and_notes_info.get('creator'))
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] save creator info:{creator_and_notes_info.get('creator')}")
|
||||
else:
|
||||
# get notes
|
||||
notes = await self.xhs_client.get_notes_by_creator(creator, cursor)
|
||||
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get notes res:{notes_res}")
|
||||
|
||||
if notes == None or not notes:
|
||||
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes error")
|
||||
continue
|
||||
|
||||
cursor = notes.get('cursor')
|
||||
has_more_notes = notes.get('has_more_notes')
|
||||
notes_res = notes.get('notes')
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes res:{notes_res}")
|
||||
|
||||
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
|
||||
task_list = [
|
||||
self.get_note_detail(post_item.get('id'), semaphore)
|
||||
for post_item in notes_res
|
||||
]
|
||||
note_details = await asyncio.gather(*task_list)
|
||||
for note_detail in note_details:
|
||||
if note_detail is not None:
|
||||
await xhs_store.update_xhs_note(note_detail)
|
||||
note_id_list.append(note_detail.get('note_id'))
|
||||
page += 1
|
||||
|
||||
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Note details: {note_details}")
|
||||
await self.batch_get_note_comments(note_id_list)
|
||||
|
||||
if not has_more_notes:
|
||||
break
|
||||
|
||||
async def get_specified_notes(self):
|
||||
"""Get the information and comments of the specified post"""
|
||||
|
@ -31,12 +31,20 @@ async def update_xhs_note(note_item: Dict):
|
||||
user_info = note_item.get("user", {})
|
||||
interact_info = note_item.get("interact_info", {})
|
||||
image_list: List[Dict] = note_item.get("image_list", [])
|
||||
tag_list: List[Dict] = note_item.get("tag_list", [])
|
||||
|
||||
video_url = ''
|
||||
if note_item.get('type') == 'video':
|
||||
videos = note_item.get('video').get('media').get('stream').get('h264')
|
||||
if type(videos).__name__ == 'list':
|
||||
video_url = ','.join([ v.get('master_url') for v in videos])
|
||||
|
||||
local_db_item = {
|
||||
"note_id": note_item.get("note_id"),
|
||||
"type": note_item.get("type"),
|
||||
"title": note_item.get("title") or note_item.get("desc", "")[:255],
|
||||
"desc": note_item.get("desc", ""),
|
||||
"video_url": video_url,
|
||||
"time": note_item.get("time"),
|
||||
"last_update_time": note_item.get("last_update_time", 0),
|
||||
"user_id": user_info.get("user_id"),
|
||||
@ -48,6 +56,7 @@ async def update_xhs_note(note_item: Dict):
|
||||
"share_count": interact_info.get("share_count"),
|
||||
"ip_location": note_item.get("ip_location", ""),
|
||||
"image_list": ','.join([img.get('url', '') for img in image_list]),
|
||||
"tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type')=='topic']),
|
||||
"last_modify_ts": utils.get_current_timestamp(),
|
||||
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}"
|
||||
}
|
||||
@ -77,3 +86,32 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict):
|
||||
}
|
||||
utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}")
|
||||
await XhsStoreFactory.create_store().store_comment(local_db_item)
|
||||
|
||||
async def save_creator(user_id: str, creator: Dict):
|
||||
user_info = creator.get('basicInfo', {})
|
||||
|
||||
follows = 0
|
||||
fans = 0
|
||||
interaction = 0
|
||||
for i in creator.get('interactions'):
|
||||
if i.get('type') == 'follows':
|
||||
follows = i.get('count')
|
||||
elif i.get('type') == 'fans':
|
||||
fans = i.get('count')
|
||||
elif i.get('type') == 'interaction':
|
||||
interaction = i.get('count')
|
||||
|
||||
local_db_item = {
|
||||
'user_id': user_id,
|
||||
'nickname': user_info.get('nickname'),
|
||||
'gender': '女' if user_info.get('gender') == 1 else '男' ,
|
||||
'avatar': user_info.get('images'),
|
||||
'desc': user_info.get('desc'),
|
||||
'ip_location': user_info.get('ip_location'),
|
||||
'follows': follows,
|
||||
'fans': fans,
|
||||
'interaction': interaction,
|
||||
'tag_list': json.dumps({tag.get('tagType'):tag.get('name') for tag in creator.get('tags')}),
|
||||
}
|
||||
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
|
||||
await XhsStoreFactory.create_store().store_creator(local_db_item)
|
@ -25,6 +25,7 @@ class XHSNote(XhsBaseModel):
|
||||
type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)")
|
||||
title = fields.CharField(null=True, max_length=255, description="笔记标题")
|
||||
desc = fields.TextField(null=True, description="笔记描述")
|
||||
video_url = fields.TextField(null=True, description="视频地址")
|
||||
time = fields.BigIntField(description="笔记发布时间戳", index=True)
|
||||
last_update_time = fields.BigIntField(description="笔记最后更新时间戳")
|
||||
liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数")
|
||||
@ -32,6 +33,7 @@ class XHSNote(XhsBaseModel):
|
||||
comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数")
|
||||
share_count = fields.CharField(null=True, max_length=16, description="笔记分享数")
|
||||
image_list = fields.TextField(null=True, description="笔记封面图片列表")
|
||||
tag_list = fields.TextField(null=True, description="标签列表")
|
||||
note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL")
|
||||
|
||||
class Meta:
|
||||
@ -55,3 +57,19 @@ class XHSNoteComment(XhsBaseModel):
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.comment_id} - {self.content}"
|
||||
|
||||
|
||||
class XhsCreator(XhsBaseModel):
|
||||
desc = fields.TextField(null=True, description="用户描述")
|
||||
gender = fields.CharField(null=True, max_length=1, description="性别")
|
||||
follows = fields.CharField(null=True, max_length=16, description="关注数")
|
||||
fans = fields.CharField(null=True, max_length=16, description="粉丝数")
|
||||
interaction = fields.CharField(null=True, max_length=16, description="获赞和收藏数")
|
||||
# follows = fields.IntField(description="关注数")
|
||||
# fans = fields.IntField(description="粉丝数")
|
||||
# interaction = fields.IntField(description="获赞和收藏数")
|
||||
tag_list = fields.TextField(null=True, description="标签列表") # json字符串
|
||||
|
||||
class Meta:
|
||||
table = "xhs_creator"
|
||||
table_description = "小红书博主"
|
||||
|
@ -72,6 +72,17 @@ class XhsCsvStoreImplement(AbstractStore):
|
||||
"""
|
||||
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
|
||||
|
||||
async def store_creator(self, creator: Dict):
|
||||
"""
|
||||
Xiaohongshu content CSV storage implementation
|
||||
Args:
|
||||
creator: creator dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
await self.save_data_to_csv(save_item=creator, store_type="creator")
|
||||
|
||||
|
||||
class XhsDbStoreImplement(AbstractStore):
|
||||
async def store_content(self, content_item: Dict):
|
||||
@ -121,6 +132,31 @@ class XhsDbStoreImplement(AbstractStore):
|
||||
comment_pydantic.model_validate(comment_data)
|
||||
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
|
||||
|
||||
async def store_creator(self, creator: Dict):
|
||||
"""
|
||||
Xiaohongshu content DB storage implementation
|
||||
Args:
|
||||
creator: creator dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
from .xhs_store_db_types import XhsCreator
|
||||
user_id = creator.get("user_id")
|
||||
if not await XhsCreator.filter(user_id=user_id).first():
|
||||
creator["add_ts"] = utils.get_current_timestamp()
|
||||
creator["last_modify_ts"] = creator["add_ts"]
|
||||
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticCreate", exclude=('id',))
|
||||
creator_data = creator_pydantic(**creator)
|
||||
creator_pydantic.model_validate(creator_data)
|
||||
await XhsCreator.create(**creator_data.model_dump())
|
||||
else:
|
||||
creator["last_modify_ts"] = utils.get_current_timestamp()
|
||||
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticUpdate", exclude=('id', 'add_ts',))
|
||||
creator_data = creator_pydantic(**creator)
|
||||
creator_pydantic.model_validate(creator_data)
|
||||
await XhsCreator.filter(user_id=user_id).update(**creator_data.model_dump())
|
||||
|
||||
|
||||
class XhsJsonStoreImplement(AbstractStore):
|
||||
json_store_path: str = "data/xhs"
|
||||
@ -181,3 +217,14 @@ class XhsJsonStoreImplement(AbstractStore):
|
||||
|
||||
"""
|
||||
await self.save_data_to_json(comment_item, "comments")
|
||||
|
||||
async def store_creator(self, creator: Dict):
|
||||
"""
|
||||
Xiaohongshu content JSON storage implementation
|
||||
Args:
|
||||
creator: creator dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
await self.save_data_to_json(creator, "creator")
|
Loading…
Reference in New Issue
Block a user