Merge pull request #142 from jayeeliu/main

小红书支持通过博主ID采集笔记和评论
This commit is contained in:
relakkes 2024-03-02 23:30:31 +08:00 committed by GitHub
commit 7002061d70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 244 additions and 2 deletions

View File

@ -49,3 +49,9 @@ class AbstractStore(ABC):
@abstractmethod @abstractmethod
async def store_comment(self, comment_item: Dict): async def store_comment(self, comment_item: Dict):
pass pass
# TODO support all platform
# only xhs is supported, so @abstractmethod is commented
# @abstractmethod
async def store_creator(self, creator: Dict):
pass

View File

@ -3,6 +3,7 @@ PLATFORM = "xhs"
KEYWORDS = "python,golang" KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = "" COOKIES = ""
SORT_TYPE="popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值展示只支持小红书
CRAWLER_TYPE = "search" CRAWLER_TYPE = "search"
# 是否开启 IP 代理 # 是否开启 IP 代理
@ -70,3 +71,11 @@ WEIBO_SPECIFIED_ID_LIST = [
"4982041758140155", "4982041758140155",
# ........................ # ........................
] ]
# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = [
"59d8cb33de5fb4696bf17217",
"61b87386000000001000b18b",
"5e8558100000000001005bc5",
# ........................
]

View File

@ -36,8 +36,8 @@ async def main():
choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM) choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
parser.add_argument('--type', type=str, help='crawler type (search | detail)', parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',
choices=["search", "detail"], default=config.CRAWLER_TYPE) choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)
# init db # init db
if config.SAVE_DATA_OPTION == "db": if config.SAVE_DATA_OPTION == "db":

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import json import json
import re
from typing import Callable, Dict, List, Optional from typing import Callable, Dict, List, Optional
from urllib.parse import urlencode from urllib.parse import urlencode
@ -73,11 +74,18 @@ class XHSClient:
Returns: Returns:
""" """
# return response.text
return_response = kwargs.pop('return_response', False)
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request( response = await client.request(
method, url, timeout=self.timeout, method, url, timeout=self.timeout,
**kwargs **kwargs
) )
if return_response:
return response.text
data: Dict = response.json() data: Dict = response.json()
if data["success"]: if data["success"]:
return data.get("data", data.get("success", {})) return data.get("data", data.get("success", {}))
@ -178,6 +186,56 @@ class XHSClient:
} }
return await self.post(uri, data) return await self.post(uri, data)
async def get_creator_info_and_notes(self, creator: str) -> Dict:
"""
获取博主的信息和第一页的笔记
Args:
creator: 博主ID
Returns:
{"creator":{}, "notes":[]}
"""
path = '/user/profile/'+creator
content = await self.request(method="GET", url=f"https://www.xiaohongshu.com{path}", return_response=True)
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', content, re.M)
if match == None:
return {}
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
if info == None:
return {}
return {
'creator': info.get('user').get('userPageData'),
'notes': info.get('user').get('notes')[0],
'cursor': info.get('user').get('noteQueries')[0].get('cursor'),
'has_more_notes': info.get('user').get('noteQueries')[0].get('hasMore')
}
async def get_notes_by_creator(
self, creator: str,
cursor: str,
page_size: int = 30
) -> Dict:
"""
获取博主的笔记
Args:
creator: 博主ID
cursor: 上一页最后一条笔记的ID
page_size: 分页数据长度
Returns:
"""
uri = "/api/sns/web/v1/user_posted"
data = {
"user_id": creator,
"cursor": cursor,
"num": page_size,
"image_formats": "jpg,webp,avif"
}
return await self.get(uri, data)
async def get_note_by_id(self, note_id: str) -> Dict: async def get_note_by_id(self, note_id: str) -> Dict:
""" """
获取笔记详情API 获取笔记详情API

View File

@ -16,6 +16,7 @@ from var import crawler_type_var
from .client import XHSClient from .client import XHSClient
from .exception import DataFetchError from .exception import DataFetchError
from .field import SearchSortType
from .login import XHSLogin from .login import XHSLogin
@ -84,6 +85,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
elif self.crawler_type == "detail": elif self.crawler_type == "detail":
# Get the information and comments of the specified post # Get the information and comments of the specified post
await self.get_specified_notes() await self.get_specified_notes()
elif self.crawler_type == "creator":
# Get creator's information and their notes and comments
await self.get_creators_and_notes()
else: else:
pass pass
@ -101,6 +105,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
notes_res = await self.xhs_client.get_note_by_keyword( notes_res = await self.xhs_client.get_note_by_keyword(
keyword=keyword, keyword=keyword,
page=page, page=page,
sort=SearchSortType(config.SORT_TYPE) if config.SORT_TYPE!='' else SearchSortType.GENERAL,
) )
utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}") utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
@ -118,6 +123,67 @@ class XiaoHongShuCrawler(AbstractCrawler):
utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}") utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
await self.batch_get_note_comments(note_id_list) await self.batch_get_note_comments(note_id_list)
async def get_creators_and_notes(self) -> None:
"""Get creator's notes and retrieve their comment information."""
utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
xhs_limit_count = 30
for creator in config.XHS_CREATOR_ID_LIST:
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Current creator: {creator}")
page = 0
cursor = ''
has_more_notes = False
while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
note_id_list: List[str] = []
if page == 0:
# get creator info and notes
creator_and_notes_info = await self.xhs_client.get_creator_info_and_notes(creator)
if creator_and_notes_info == None or not creator_and_notes_info:
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator notes error")
continue
notes_res = creator_and_notes_info.get('notes')
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator and notes:{notes_res}")
cursor = creator_and_notes_info.get('cursor')
has_more_notes = creator_and_notes_info.get('has_more_notes')
# save creator info
await xhs_store.save_creator(creator, creator_and_notes_info.get('creator'))
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] save creator info:{creator_and_notes_info.get('creator')}")
else:
# get notes
notes = await self.xhs_client.get_notes_by_creator(creator, cursor)
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get notes res:{notes_res}")
if notes == None or not notes:
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes error")
continue
cursor = notes.get('cursor')
has_more_notes = notes.get('has_more_notes')
notes_res = notes.get('notes')
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes res:{notes_res}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_note_detail(post_item.get('id'), semaphore)
for post_item in notes_res
]
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_store.update_xhs_note(note_detail)
note_id_list.append(note_detail.get('note_id'))
page += 1
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Note details: {note_details}")
await self.batch_get_note_comments(note_id_list)
if not has_more_notes:
break
async def get_specified_notes(self): async def get_specified_notes(self):
"""Get the information and comments of the specified post""" """Get the information and comments of the specified post"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)

View File

@ -31,12 +31,20 @@ async def update_xhs_note(note_item: Dict):
user_info = note_item.get("user", {}) user_info = note_item.get("user", {})
interact_info = note_item.get("interact_info", {}) interact_info = note_item.get("interact_info", {})
image_list: List[Dict] = note_item.get("image_list", []) image_list: List[Dict] = note_item.get("image_list", [])
tag_list: List[Dict] = note_item.get("tag_list", [])
video_url = ''
if note_item.get('type') == 'video':
videos = note_item.get('video').get('media').get('stream').get('h264')
if type(videos).__name__ == 'list':
video_url = ','.join([ v.get('master_url') for v in videos])
local_db_item = { local_db_item = {
"note_id": note_item.get("note_id"), "note_id": note_item.get("note_id"),
"type": note_item.get("type"), "type": note_item.get("type"),
"title": note_item.get("title") or note_item.get("desc", "")[:255], "title": note_item.get("title") or note_item.get("desc", "")[:255],
"desc": note_item.get("desc", ""), "desc": note_item.get("desc", ""),
"video_url": video_url,
"time": note_item.get("time"), "time": note_item.get("time"),
"last_update_time": note_item.get("last_update_time", 0), "last_update_time": note_item.get("last_update_time", 0),
"user_id": user_info.get("user_id"), "user_id": user_info.get("user_id"),
@ -48,6 +56,7 @@ async def update_xhs_note(note_item: Dict):
"share_count": interact_info.get("share_count"), "share_count": interact_info.get("share_count"),
"ip_location": note_item.get("ip_location", ""), "ip_location": note_item.get("ip_location", ""),
"image_list": ','.join([img.get('url', '') for img in image_list]), "image_list": ','.join([img.get('url', '') for img in image_list]),
"tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type')=='topic']),
"last_modify_ts": utils.get_current_timestamp(), "last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}" "note_url": f"https://www.xiaohongshu.com/explore/{note_id}"
} }
@ -77,3 +86,32 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict):
} }
utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}") utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}")
await XhsStoreFactory.create_store().store_comment(local_db_item) await XhsStoreFactory.create_store().store_comment(local_db_item)
async def save_creator(user_id: str, creator: Dict):
user_info = creator.get('basicInfo', {})
follows = 0
fans = 0
interaction = 0
for i in creator.get('interactions'):
if i.get('type') == 'follows':
follows = i.get('count')
elif i.get('type') == 'fans':
fans = i.get('count')
elif i.get('type') == 'interaction':
interaction = i.get('count')
local_db_item = {
'user_id': user_id,
'nickname': user_info.get('nickname'),
'gender': '' if user_info.get('gender') == 1 else '' ,
'avatar': user_info.get('images'),
'desc': user_info.get('desc'),
'ip_location': user_info.get('ip_location'),
'follows': follows,
'fans': fans,
'interaction': interaction,
'tag_list': json.dumps({tag.get('tagType'):tag.get('name') for tag in creator.get('tags')}),
}
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
await XhsStoreFactory.create_store().store_creator(local_db_item)

View File

@ -25,6 +25,7 @@ class XHSNote(XhsBaseModel):
type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)") type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)")
title = fields.CharField(null=True, max_length=255, description="笔记标题") title = fields.CharField(null=True, max_length=255, description="笔记标题")
desc = fields.TextField(null=True, description="笔记描述") desc = fields.TextField(null=True, description="笔记描述")
video_url = fields.TextField(null=True, description="视频地址")
time = fields.BigIntField(description="笔记发布时间戳", index=True) time = fields.BigIntField(description="笔记发布时间戳", index=True)
last_update_time = fields.BigIntField(description="笔记最后更新时间戳") last_update_time = fields.BigIntField(description="笔记最后更新时间戳")
liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数") liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数")
@ -32,6 +33,7 @@ class XHSNote(XhsBaseModel):
comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数") comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数")
share_count = fields.CharField(null=True, max_length=16, description="笔记分享数") share_count = fields.CharField(null=True, max_length=16, description="笔记分享数")
image_list = fields.TextField(null=True, description="笔记封面图片列表") image_list = fields.TextField(null=True, description="笔记封面图片列表")
tag_list = fields.TextField(null=True, description="标签列表")
note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL") note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL")
class Meta: class Meta:
@ -55,3 +57,19 @@ class XHSNoteComment(XhsBaseModel):
def __str__(self): def __str__(self):
return f"{self.comment_id} - {self.content}" return f"{self.comment_id} - {self.content}"
class XhsCreator(XhsBaseModel):
desc = fields.TextField(null=True, description="用户描述")
gender = fields.CharField(null=True, max_length=1, description="性别")
follows = fields.CharField(null=True, max_length=16, description="关注数")
fans = fields.CharField(null=True, max_length=16, description="粉丝数")
interaction = fields.CharField(null=True, max_length=16, description="获赞和收藏数")
# follows = fields.IntField(description="关注数")
# fans = fields.IntField(description="粉丝数")
# interaction = fields.IntField(description="获赞和收藏数")
tag_list = fields.TextField(null=True, description="标签列表") # json字符串
class Meta:
table = "xhs_creator"
table_description = "小红书博主"

View File

@ -72,6 +72,17 @@ class XhsCsvStoreImplement(AbstractStore):
""" """
await self.save_data_to_csv(save_item=comment_item, store_type="comments") await self.save_data_to_csv(save_item=comment_item, store_type="comments")
async def store_creator(self, creator: Dict):
"""
Xiaohongshu content CSV storage implementation
Args:
creator: creator dict
Returns:
"""
await self.save_data_to_csv(save_item=creator, store_type="creator")
class XhsDbStoreImplement(AbstractStore): class XhsDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict): async def store_content(self, content_item: Dict):
@ -121,6 +132,31 @@ class XhsDbStoreImplement(AbstractStore):
comment_pydantic.model_validate(comment_data) comment_pydantic.model_validate(comment_data)
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
async def store_creator(self, creator: Dict):
"""
Xiaohongshu content DB storage implementation
Args:
creator: creator dict
Returns:
"""
from .xhs_store_db_types import XhsCreator
user_id = creator.get("user_id")
if not await XhsCreator.filter(user_id=user_id).first():
creator["add_ts"] = utils.get_current_timestamp()
creator["last_modify_ts"] = creator["add_ts"]
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticCreate", exclude=('id',))
creator_data = creator_pydantic(**creator)
creator_pydantic.model_validate(creator_data)
await XhsCreator.create(**creator_data.model_dump())
else:
creator["last_modify_ts"] = utils.get_current_timestamp()
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticUpdate", exclude=('id', 'add_ts',))
creator_data = creator_pydantic(**creator)
creator_pydantic.model_validate(creator_data)
await XhsCreator.filter(user_id=user_id).update(**creator_data.model_dump())
class XhsJsonStoreImplement(AbstractStore): class XhsJsonStoreImplement(AbstractStore):
json_store_path: str = "data/xhs" json_store_path: str = "data/xhs"
@ -181,3 +217,14 @@ class XhsJsonStoreImplement(AbstractStore):
""" """
await self.save_data_to_json(comment_item, "comments") await self.save_data_to_json(comment_item, "comments")
async def store_creator(self, creator: Dict):
"""
Xiaohongshu content JSON storage implementation
Args:
creator: creator dict
Returns:
"""
await self.save_data_to_json(creator, "creator")