diff --git a/config/base_config.py b/config/base_config.py index de5567e..d6be4ed 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -1,42 +1,44 @@ -# Desc: base config +# 基础配置 PLATFORM = "xhs" KEYWORDS = "python,golang" LOGIN_TYPE = "qrcode" # qrcode or phone or cookie -COOKIES = "" # login by cookie, if login_type is cookie, you must set this value +COOKIES = "" CRAWLER_TYPE = "search" -# enable ip proxy +# 是否开启 IP 代理 ENABLE_IP_PROXY = False -# retry_interval +# 重试时间 RETRY_INTERVAL = 60 * 30 # 30 minutes # playwright headless HEADLESS = True -# save login state +# 是否保存登录状态 SAVE_LOGIN_STATE = True -# save user data dir +# 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name -# crawler max notes count +# 爬取视频/帖子的数量控制 CRAWLER_MAX_NOTES_COUNT = 20 -# max concurrency num +# 并发爬虫数量控制 MAX_CONCURRENCY_NUM = 10 -# xhs specified note id list +# 指定小红书需要爬虫的笔记ID列表 XHS_SPECIFIED_ID_LIST = [ "6422c2750000000027000d88", "64ca1b73000000000b028dd2", "630d5b85000000001203ab41", +# ........................ ] -# douyin specified note id list +# 指定抖音需要爬取的ID列表 DY_SPECIFIED_ID_LIST = [ "7280854932641664319", "7202432992642387233" +# ........................ ] \ No newline at end of file diff --git a/config/db_config.py b/config/db_config.py index 270edd3..1e9d267 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -9,4 +9,4 @@ RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db pas RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" # save data to database option -IS_SAVED_DATABASED = False # if you want to save data to database, set True +IS_SAVED_DATABASED = True # if you want to save data to database, set True diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 2c9f3c3..34f08ef 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -1,16 +1,16 @@ # -*- coding: utf-8 -*- import asyncio import json +from typing import Any, Callable, Dict, Optional from urllib.parse import urlencode -from typing import Dict, Optional import httpx from playwright.async_api import BrowserContext, Page from tools import utils -from .graphql import KuaiShouGraphQL from .exception import DataFetchError, IPBlockError +from .graphql import KuaiShouGraphQL class KuaiShouClient: @@ -31,7 +31,7 @@ class KuaiShouClient: self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() - async def request(self, method, url, **kwargs) -> Dict: + async def request(self, method, url, **kwargs) -> Any: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, @@ -89,7 +89,6 @@ class KuaiShouClient: } return await self.post("", post_data) - async def get_video_info(self, photo_id: str) -> Dict: """ Kuaishou web video detail api @@ -105,3 +104,71 @@ class KuaiShouClient: "query": self.graphql.get("video_detail") } return await self.post("", post_data) + + async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict: + """get video comments + :param photo_id: photo id you want to fetch + :param pcursor: last you get pcursor, defaults to "" + :return: + """ + post_data = { + "operationName": "commentListQuery", + "variables": { + "photoId": photo_id, + "pcursor": pcursor + }, + "query": self.graphql.get("comment_list") + + } + return await self.post("", post_data) + + async def get_video_sub_comments( + self, note_id: str, + root_comment_id: str, + num: int = 30, cursor: str = "" + ): + """ + get note sub comments + :param note_id: note id you want to fetch + :param root_comment_id: parent comment id + :param num: recommend 30, if num greater 30, it only return 30 comments + :param cursor: last you get cursor, defaults to "" + :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930} + """ + uri = "/api/sns/web/v2/comment/sub/page" + params = { + "note_id": note_id, + "root_comment_id": root_comment_id, + "num": num, + "cursor": cursor, + } + return await self.get(uri, params) + + async def get_video_all_comments(self, photo_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, + callback: Optional[Callable] = None, ): + """ + get video all comments include sub comments + :param photo_id: + :param crawl_interval: + :param is_fetch_sub_comments: + :param callback: + :return: + """ + + result = [] + pcursor = "" + while pcursor != "no_more": + comments_res = await self.get_video_comments(photo_id, pcursor) + vision_commen_list = comments_res.get("visionCommentList", {}) + pcursor = vision_commen_list.get("pcursor", "") + comments = vision_commen_list.get("rootComments", []) + + if callback: # 如果有回调函数,就执行回调函数 + await callback(photo_id, comments) + + await asyncio.sleep(crawl_interval) + if not is_fetch_sub_comments: + result.extend(comments) + continue + # todo handle get sub comments + return result diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index a2efd0f..2fd91ea 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -1,5 +1,8 @@ import asyncio import os +import random +import time +from asyncio import Task from typing import Dict, List, Optional, Tuple from playwright.async_api import (BrowserContext, BrowserType, Page, @@ -8,13 +11,13 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool -from tools import utils -from var import crawler_type_var from models import kuaishou +from tools import utils +from var import comment_tasks_var, crawler_type_var from .client import KuaiShouClient -from .login import KuaishouLogin from .exception import DataFetchError +from .login import KuaishouLogin class KuaishouCrawler(AbstractCrawler): @@ -109,9 +112,6 @@ class KuaishouCrawler(AbstractCrawler): async def get_specified_notes(self): pass - async def batch_get_video_comments(self, video_id_list: List[str]): - utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") - async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: """Get video detail task""" async with semaphore: @@ -126,6 +126,52 @@ class KuaishouCrawler(AbstractCrawler): utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") return None + async def batch_get_video_comments(self, video_id_list: List[str]): + """ + batch get video comments + :param video_id_list: + :return: + """ + utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for video_id in video_id_list: + task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id) + task_list.append(task) + + comment_tasks_var.set(task_list) + await asyncio.gather(*task_list) + + async def get_comments(self, video_id: str, semaphore: asyncio.Semaphore): + """ + get comment for video id + :param video_id: + :param semaphore: + :return: + """ + async with semaphore: + try: + await self.ks_client.get_video_all_comments( + photo_id=video_id, + crawl_interval=random.random(), + callback=kuaishou.batch_update_ks_video_comments + ) + except DataFetchError as ex: + utils.logger.error(f"Get video_id: {video_id} comment error: {ex}") + except Exception as e: + utils.logger.error(f"map by been blocked, err:", e) + # use time.sleeep block main coroutine instead of asyncio.sleep and cacel running comment task + # maybe kuaishou block our request, we will take a nap and update the cookie again + current_running_tasks = comment_tasks_var.get() + for task in current_running_tasks: + task.cancel() + time.sleep(20) + await self.context_page.goto(f"{self.index_url}?isHome=1") + await self.ks_client.update_cookies(browser_context=self.browser_context) + + + + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: """Create proxy info for playwright and httpx""" # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py index c73e18d..1b71917 100644 --- a/media_platform/kuaishou/graphql.py +++ b/media_platform/kuaishou/graphql.py @@ -1,10 +1,10 @@ # 快手的数据传输是基于GraphQL实现的 # 这个类负责获取一些GraphQL的schema -import os +from typing import Dict class KuaiShouGraphQL: - graphql_queries = {} + graphql_queries: Dict[str, str]= {} def __init__(self): self.graphql_dir = "media_platform/kuaishou/graphql/" diff --git a/media_platform/kuaishou/graphql/comment_list.graphql b/media_platform/kuaishou/graphql/comment_list.graphql index e69de29..b216b8e 100644 --- a/media_platform/kuaishou/graphql/comment_list.graphql +++ b/media_platform/kuaishou/graphql/comment_list.graphql @@ -0,0 +1,39 @@ +query commentListQuery($photoId: String, $pcursor: String) { + visionCommentList(photoId: $photoId, pcursor: $pcursor) { + commentCount + pcursor + rootComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + subCommentCount + subCommentsPcursor + subComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + replyToUserName + replyTo + __typename + } + __typename + } + __typename + } +} diff --git a/models/__init__.py b/models/__init__.py index d491f01..cb9970b 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,3 +1,3 @@ from .douyin import * +from .kuaishou import * from .xiaohongshu import * - diff --git a/models/kuaishou.py b/models/kuaishou.py index 00b4c97..70a9c4c 100644 --- a/models/kuaishou.py +++ b/models/kuaishou.py @@ -30,7 +30,7 @@ class KuaishouVideo(KuaishouBaseModel): desc = fields.TextField(null=True, description="视频描述") create_time = fields.BigIntField(description="视频发布时间戳", index=True) liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") - video_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") @@ -64,15 +64,15 @@ async def update_kuaishou_video(video_item: Dict): user_info = video_item.get("author", {}) local_db_item = { "video_id": video_id, - "video_type": video_item.get("type"), - "title": photo_info.get("caption", ""), - "desc": photo_info.get("caption", ""), + "video_type": str(video_item.get("type")), + "title": photo_info.get("caption", "")[:500], + "desc": photo_info.get("caption", "")[:500], "create_time": photo_info.get("timestamp"), "user_id": user_info.get("id"), "nickname": user_info.get("name"), "avatar": user_info.get("headerUrl", ""), - "liked_count": photo_info.get("realLikeCount"), - "viewd_count": photo_info.get("viewCount"), + "liked_count": str(photo_info.get("realLikeCount")), + "viewd_count": str(photo_info.get("viewCount")), "last_modify_ts": utils.get_current_timestamp(), "video_url": f"https://www.kuaishou.com/short-video/{video_id}", "video_cover_url": photo_info.get("coverUrl", ""), @@ -82,12 +82,12 @@ async def update_kuaishou_video(video_item: Dict): if config.IS_SAVED_DATABASED: if not await KuaishouVideo.filter(video_id=video_id).exists(): local_db_item["add_ts"] = utils.get_current_timestamp() - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoCreate', exclude=('id',)) + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_video_pydantic.model_validate(kuaishou_data) await KuaishouVideo.create(**kuaishou_data.model_dump()) else: - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoUpdate', + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', exclude=('id', 'add_ts')) kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_video_pydantic.model_validate(kuaishou_data) @@ -111,28 +111,16 @@ async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): async def update_ks_video_comment(video_id: str, comment_item: Dict): - comment_video_id = comment_item.get("video_id") - if video_id != comment_video_id: - print(f"comment_video_id: {comment_video_id} != video_id: {video_id}") - return - user_info = comment_item.get("user", {}) - comment_id = comment_item.get("cid") - avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( - "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} + comment_id = comment_item.get("commentId") local_db_item = { "comment_id": comment_id, - "create_time": comment_item.get("create_time"), - "ip_location": comment_item.get("ip_label", ""), + "create_time": comment_item.get("timestamp"), "video_id": video_id, - "content": comment_item.get("text"), - "user_id": user_info.get("uid"), - "sec_uid": user_info.get("sec_uid"), - "short_user_id": user_info.get("short_id"), - "user_unique_id": user_info.get("unique_id"), - "user_signature": user_info.get("signature"), - "nickname": user_info.get("nickname"), - "avatar": avatar_info.get("url_list", [""])[0], - "sub_comment_count": comment_item.get("reply_comment_total", 0), + "content": comment_item.get("content"), + "user_id": comment_item.get("authorId"), + "nickname": comment_item.get("authorName"), + "avatar": comment_item.get("headurl"), + "sub_comment_count": str(comment_item.get("subCommentCount", 0)), "last_modify_ts": utils.get_current_timestamp(), } print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") diff --git a/var.py b/var.py index e1b6368..e929f8e 100644 --- a/var.py +++ b/var.py @@ -1,4 +1,7 @@ +from asyncio.tasks import Task from contextvars import ContextVar +from typing import List request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="") -crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") \ No newline at end of file +crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") +comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])