# -*- coding: utf-8 -*- import asyncio import json from typing import Any, Callable, Dict, List, Optional from urllib.parse import urlencode import httpx from playwright.async_api import BrowserContext, Page import config from base.base_crawler import AbstractApiClient from tools import utils from .exception import DataFetchError from .graphql import KuaiShouGraphQL class KuaiShouClient(AbstractApiClient): def __init__( self, timeout=10, proxies=None, *, headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], ): self.proxies = proxies self.timeout = timeout self.headers = headers self._host = "https://www.kuaishou.com/graphql" self.playwright_page = playwright_page self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() async def request(self, method, url, **kwargs) -> Any: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, **kwargs ) data: Dict = response.json() if data.get("errors"): raise DataFetchError(data.get("errors", "unkonw error")) else: return data.get("data", {}) async def get(self, uri: str, params=None) -> Dict: final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) async def post(self, uri: str, data: dict) -> Dict: json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=self.headers) async def pong(self) -> bool: """get a note to check if login state is ok""" utils.logger.info("[KuaiShouClient.pong] Begin pong kuaishou...") ping_flag = False try: post_data = { "operationName": "visionProfileUserList", "variables": { "ftype": 1, }, "query": self.graphql.get("vision_profile_user_list") } res = await self.post("", post_data) if res.get("visionProfileUserList", {}).get("result") == 1: ping_flag = True except Exception as e: utils.logger.error(f"[KuaiShouClient.pong] Pong kuaishou failed: {e}, and try to login again...") ping_flag = False return ping_flag async def update_cookies(self, browser_context: BrowserContext): cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) self.headers["Cookie"] = cookie_str self.cookie_dict = cookie_dict async def search_info_by_keyword(self, keyword: str, pcursor: str): """ KuaiShou web search api :param keyword: search keyword :param pcursor: limite page curson :return: """ post_data = { "operationName": "visionSearchPhoto", "variables": { "keyword": keyword, "pcursor": pcursor, "page": "search" }, "query": self.graphql.get("search_query") } return await self.post("", post_data) async def get_video_info(self, photo_id: str) -> Dict: """ Kuaishou web video detail api :param photo_id: :return: """ post_data = { "operationName": "visionVideoDetail", "variables": { "photoId": photo_id, "page": "search" }, "query": self.graphql.get("video_detail") } return await self.post("", post_data) async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict: """get video comments :param photo_id: photo id you want to fetch :param pcursor: last you get pcursor, defaults to "" :return: """ post_data = { "operationName": "commentListQuery", "variables": { "photoId": photo_id, "pcursor": pcursor }, "query": self.graphql.get("comment_list") } return await self.post("", post_data) async def get_video_sub_comments( self, photo_id: str, rootCommentId: str, pcursor: str = "" ) -> Dict: """get video sub comments :param photo_id: photo id you want to fetch :param pcursor: last you get pcursor, defaults to "" :return: """ post_data = { "operationName": "visionSubCommentList", "variables": { "photoId": photo_id, "pcursor": pcursor, "rootCommentId": rootCommentId, }, "query": self.graphql.get("vision_sub_comment_list"), } return await self.post("", post_data) async def get_creator_profile(self, userId: str) -> Dict: post_data = { "operationName": "visionProfile", "variables": { "userId": userId }, "query": self.graphql.get("vision_profile"), } return await self.post("", post_data) async def get_video_by_creater(self, userId: str, pcursor: str = "") -> Dict: post_data = { "operationName": "visionProfilePhotoList", "variables": { "page": "profile", "pcursor": pcursor, "userId": userId }, "query": self.graphql.get("vision_profile_photo_list"), } return await self.post("", post_data) async def get_video_all_comments( self, photo_id: str, crawl_interval: float = 1.0, callback: Optional[Callable] = None, ): """ get video all comments include sub comments :param photo_id: :param crawl_interval: :param callback: :return: """ result = [] pcursor = "" while pcursor != "no_more": comments_res = await self.get_video_comments(photo_id, pcursor) vision_commen_list = comments_res.get("visionCommentList", {}) pcursor = vision_commen_list.get("pcursor", "") comments = vision_commen_list.get("rootComments", []) if callback: # 如果有回调函数,就执行回调函数 await callback(photo_id, comments) result.extend(comments) await asyncio.sleep(crawl_interval) sub_comments = await self.get_comments_all_sub_comments( comments, photo_id, crawl_interval, callback ) result.extend(sub_comments) return result async def get_comments_all_sub_comments( self, comments: List[Dict], photo_id, crawl_interval: float = 1.0, callback: Optional[Callable] = None, ) -> List[Dict]: """ 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息 Args: comments: 评论列表 photo_id: 视频id crawl_interval: 爬取一次评论的延迟单位(秒) callback: 一次评论爬取结束后 Returns: """ if not config.ENABLE_GET_SUB_COMMENTS: utils.logger.info( f"[KuaiShouClient.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled" ) return [] result = [] for comment in comments: sub_comments = comment.get("subComments") if sub_comments and callback: await callback(photo_id, sub_comments) sub_comment_pcursor = comment.get("subCommentsPcursor") if sub_comment_pcursor == "no_more": continue root_comment_id = comment.get("commentId") sub_comment_pcursor = "" while sub_comment_pcursor != "no_more": comments_res = await self.get_video_sub_comments( photo_id, root_comment_id, sub_comment_pcursor ) sub_comment_pcursor = comments_res.get("pcursor", "no_more") comments = comments_res.get("subComments", []) if callback: await callback(photo_id, comments) await asyncio.sleep(crawl_interval) result.extend(comments) return result async def get_creator_info(self, user_id: str) -> Dict: """ eg: https://www.kuaishou.com/profile/3x4jtnbfter525a 快手用户主页 """ visionProfile = await self.get_creator_profile(user_id) return visionProfile.get("userProfile") async def get_all_videos_by_creator( self, user_id: str, crawl_interval: float = 1.0, callback: Optional[Callable] = None, ) -> List[Dict]: """ 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息 Args: user_id: 用户ID crawl_interval: 爬取一次的延迟单位(秒) callback: 一次分页爬取结束后的更新回调函数 Returns: """ result = [] pcursor = "" while pcursor != "no_more": videos_res = await self.get_video_by_creater(user_id, pcursor) if not videos_res: utils.logger.error( f"[KuaiShouClient.get_all_videos_by_creator] The current creator may have been banned by ks, so they cannot access the data." ) break vision_profile_photo_list = videos_res.get("visionProfilePhotoList", {}) pcursor = vision_profile_photo_list.get("pcursor", "") videos = vision_profile_photo_list.get("feeds", []) utils.logger.info( f"[KuaiShouClient.get_all_videos_by_creator] got user_id:{user_id} videos len : {len(videos)}" ) if callback: await callback(videos) await asyncio.sleep(crawl_interval) result.extend(videos) return result