From 95ca606938723d7fb36606e087a6ee2aa814490e Mon Sep 17 00:00:00 2001 From: Relakkes Date: Thu, 23 Nov 2023 23:13:54 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20=E5=BF=AB=E6=89=8B=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=9B=AE=E5=BD=95=E5=BB=BA=E7=AB=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- media_platform/kuaishou/__init__.py | 1 + media_platform/kuaishou/client.py | 71 ++++++++++++++++++++++++++++ media_platform/kuaishou/core.py | 1 + media_platform/kuaishou/exception.py | 9 ++++ media_platform/kuaishou/field.py | 1 + media_platform/kuaishou/login.py | 20 ++++++++ 6 files changed, 103 insertions(+) create mode 100644 media_platform/kuaishou/__init__.py create mode 100644 media_platform/kuaishou/client.py create mode 100644 media_platform/kuaishou/core.py create mode 100644 media_platform/kuaishou/exception.py create mode 100644 media_platform/kuaishou/field.py create mode 100644 media_platform/kuaishou/login.py diff --git a/media_platform/kuaishou/__init__.py b/media_platform/kuaishou/__init__.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/media_platform/kuaishou/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py new file mode 100644 index 0000000..e89de5b --- /dev/null +++ b/media_platform/kuaishou/client.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +from typing import Dict +import asyncio +import json +from typing import Dict, Optional + +import httpx +from playwright.async_api import BrowserContext, Page + +from tools import utils + +from .exception import DataFetchError, IPBlockError + + + +class KuaishouClient: + def __init__( + self, + timeout=10, + proxies=None, + *, + headers: Dict[str, str], + playwright_page: Page, + cookie_dict: Dict[str, str], + ): + self.proxies = proxies + self.timeout = timeout + self.headers = headers + self._host = "https://edith.xiaohongshu.com" + self.playwright_page = playwright_page + self.cookie_dict = cookie_dict + + async def _pre_headers(self, url: str, data=None): + pass + + async def request(self, method, url, **kwargs) -> Dict: + async with httpx.AsyncClient(proxies=self.proxies) as client: + response = await client.request( + method, url, timeout=self.timeout, + **kwargs + ) + data: Dict = response.json() + if data["success"]: + return data.get("data", data.get("success", {})) + else: + raise DataFetchError(data.get("msg", None)) + + async def get(self, uri: str, params=None) -> Dict: + final_uri = uri + if isinstance(params, dict): + final_uri = (f"{uri}?" + f"{'&'.join([f'{k}={v}' for k, v in params.items()])}") + headers = await self._pre_headers(final_uri) + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) + + async def post(self, uri: str, data: dict) -> Dict: + headers = await self._pre_headers(uri, data) + json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + return await self.request(method="POST", url=f"{self._host}{uri}", + data=json_str, headers=headers) + + async def ping(self) -> bool: + """get a note to check if login state is ok""" + utils.logger.info("Begin to ping xhs...") + ping_flag = False + try: + pass + except Exception as e: + utils.logger.error(f"Ping xhs failed: {e}, and try to login again...") + ping_flag = False + return ping_flag diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/media_platform/kuaishou/core.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/media_platform/kuaishou/exception.py b/media_platform/kuaishou/exception.py new file mode 100644 index 0000000..1a8642e --- /dev/null +++ b/media_platform/kuaishou/exception.py @@ -0,0 +1,9 @@ +from httpx import RequestError + + +class DataFetchError(RequestError): + """something error when fetch""" + + +class IPBlockError(RequestError): + """fetch so fast that the server block us ip""" diff --git a/media_platform/kuaishou/field.py b/media_platform/kuaishou/field.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/media_platform/kuaishou/field.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/media_platform/kuaishou/login.py b/media_platform/kuaishou/login.py new file mode 100644 index 0000000..1564422 --- /dev/null +++ b/media_platform/kuaishou/login.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +from base.base_crawler import AbstractLogin + + +class KuaishouLogin(AbstractLogin): + def __init__(self): + pass + + async def begin(self): + pass + + async def login_by_qrcode(self): + pass + + async def login_by_mobile(self): + pass + + async def login_by_cookies(self): + pass From f08b2ceb76763889c455183169f4cbf8cb0b225f Mon Sep 17 00:00:00 2001 From: Relakkes Date: Fri, 24 Nov 2023 00:04:33 +0800 Subject: [PATCH 2/5] =?UTF-8?q?feat:=201=E3=80=81=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E8=A1=8C=E6=94=AF=E6=8C=81=E5=BF=AB=E6=89=8B=202=E3=80=81?= =?UTF-8?q?=E5=BF=AB=E6=89=8Bplaywright=20=E4=BB=A3=E7=A0=81=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 7 +- media_platform/kuaishou/__init__.py | 3 +- media_platform/kuaishou/client.py | 18 ++-- media_platform/kuaishou/core.py | 151 +++++++++++++++++++++++++++- media_platform/kuaishou/login.py | 26 ++++- 5 files changed, 191 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index dd3dec8..81ac53a 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import config import db from base import proxy_account_pool from media_platform.douyin import DouYinCrawler +from media_platform.kuaishou import KuaishouCrawler from media_platform.xhs import XiaoHongShuCrawler @@ -16,6 +17,8 @@ class CrawlerFactory: return XiaoHongShuCrawler() elif platform == "dy": return DouYinCrawler() + elif platform == "ks": + return KuaishouCrawler() else: raise ValueError("Invalid Media Platform Currently only supported xhs or dy ...") @@ -23,8 +26,8 @@ class CrawlerFactory: async def main(): # define command line params ... parser = argparse.ArgumentParser(description='Media crawler program.') - parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy)', - choices=["xhs", "dy"], default=config.PLATFORM) + parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks)', + choices=["xhs", "dy", "ks"], default=config.PLATFORM) parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) parser.add_argument('--type', type=str, help='crawler type (search | detail)', diff --git a/media_platform/kuaishou/__init__.py b/media_platform/kuaishou/__init__.py index 7c68785..de877e0 100644 --- a/media_platform/kuaishou/__init__.py +++ b/media_platform/kuaishou/__init__.py @@ -1 +1,2 @@ -# -*- coding: utf-8 -*- \ No newline at end of file +# -*- coding: utf-8 -*- +from .core import KuaishouCrawler \ No newline at end of file diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index e89de5b..9346fd4 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -from typing import Dict import asyncio import json +from urllib.parse import urlencode from typing import Dict, Optional import httpx @@ -12,7 +12,6 @@ from tools import utils from .exception import DataFetchError, IPBlockError - class KuaishouClient: def __init__( self, @@ -26,7 +25,7 @@ class KuaishouClient: self.proxies = proxies self.timeout = timeout self.headers = headers - self._host = "https://edith.xiaohongshu.com" + self._host = "https://www.kuaishou.com" self.playwright_page = playwright_page self.cookie_dict = cookie_dict @@ -49,7 +48,7 @@ class KuaishouClient: final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" - f"{'&'.join([f'{k}={v}' for k, v in params.items()])}") + f"{urlencode(params)}") headers = await self._pre_headers(final_uri) return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) @@ -59,13 +58,18 @@ class KuaishouClient: return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=headers) - async def ping(self) -> bool: + async def pong(self) -> bool: """get a note to check if login state is ok""" - utils.logger.info("Begin to ping xhs...") + utils.logger.info("Begin pong kuaishou...") ping_flag = False try: pass except Exception as e: - utils.logger.error(f"Ping xhs failed: {e}, and try to login again...") + utils.logger.error(f"Pong kuaishou failed: {e}, and try to login again...") ping_flag = False return ping_flag + + async def update_cookies(self, browser_context: BrowserContext): + cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) + self.headers["Cookie"] = cookie_str + self.cookie_dict = cookie_dict \ No newline at end of file diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index 7c68785..67d60a6 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -1 +1,150 @@ -# -*- coding: utf-8 -*- \ No newline at end of file +import asyncio +import os +from typing import Dict, List, Optional, Tuple + +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) + +import config +from base.base_crawler import AbstractCrawler +from base.proxy_account_pool import AccountPool +from tools import utils +from var import crawler_type_var + +from .client import KuaishouClient +from .login import KuaishouLogin + + +class KuaishouCrawler(AbstractCrawler): + platform: str + login_type: str + crawler_type: str + context_page: Page + ks_client: KuaishouClient + account_pool: AccountPool + browser_context: BrowserContext + + def __init__(self): + self.index_url = "https://www.kuaishou.com" + self.user_agent = utils.get_user_agent() + + def init_config(self, platform: str, login_type: str, account_pool: AccountPool, crawler_type: str): + self.platform = platform + self.login_type = login_type + self.account_pool = account_pool + self.crawler_type = crawler_type + + async def start(self): + account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() + async with async_playwright() as playwright: + # Launch a browser context. + chromium = playwright.chromium + self.browser_context = await self.launch_browser( + chromium, + playwright_proxy, + self.user_agent, + headless=config.HEADLESS + ) + # stealth.min.js is a js script to prevent the website from detecting the crawler. + await self.browser_context.add_init_script(path="libs/stealth.min.js") + self.context_page = await self.browser_context.new_page() + await self.context_page.goto(f"{self.index_url}?isHome=1") + + # Create a client to interact with the kuaishou website. + self.ks_client = await self.create_ks_client(httpx_proxy) + if not await self.ks_client.pong(): + login_obj = KuaishouLogin( + login_type=self.login_type, + login_phone=account_phone, + browser_context=self.browser_context, + context_page=self.context_page, + cookie_str=config.COOKIES + ) + await login_obj.begin() + await self.ks_client.update_cookies(browser_context=self.browser_context) + + crawler_type_var.set(self.crawler_type) + if self.crawler_type == "search": + # Search for notes and retrieve their comment information. + await self.search() + elif self.crawler_type == "detail": + # Get the information and comments of the specified post + await self.get_specified_notes() + else: + pass + + utils.logger.info("Kuaishou Crawler finished ...") + + async def search(self): + await asyncio.Event().wait() + + + async def get_specified_notes(self): + pass + + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: + """Create proxy info for playwright and httpx""" + # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 + phone, ip_proxy = self.account_pool.get_account() + if not config.ENABLE_IP_PROXY: + return phone, None, None + utils.logger.info("Begin proxy info for playwright and httpx ...") + playwright_proxy = { + "server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}", + "username": config.IP_PROXY_USER, + "password": config.IP_PROXY_PASSWORD, + } + httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" + return phone, playwright_proxy, httpx_proxy + + async def create_ks_client(self, httpx_proxy: Optional[str]) -> KuaishouClient: + """Create xhs client""" + utils.logger.info("Begin create kuaishou API client ...") + cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + xhs_client_obj = KuaishouClient( + proxies=httpx_proxy, + headers={ + "User-Agent": self.user_agent, + "Cookie": cookie_str, + "Origin": self.index_url, + "Referer": self.index_url, + "Content-Type": "application/json;charset=UTF-8" + }, + playwright_page=self.context_page, + cookie_dict=cookie_dict, + ) + return xhs_client_obj + + async def launch_browser( + self, + chromium: BrowserType, + playwright_proxy: Optional[Dict], + user_agent: Optional[str], + headless: bool = True + ) -> BrowserContext: + """Launch browser and create browser context""" + utils.logger.info("Begin create browser context ...") + if config.SAVE_LOGIN_STATE: + user_data_dir = os.path.join(os.getcwd(), "browser_data", + config.USER_DATA_DIR % self.platform) # type: ignore + browser_context = await chromium.launch_persistent_context( + user_data_dir=user_data_dir, + accept_downloads=True, + headless=headless, + proxy=playwright_proxy, # type: ignore + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + else: + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + browser_context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + + async def close(self): + """Close browser context""" + await self.browser_context.close() + utils.logger.info("Browser context closed ...") \ No newline at end of file diff --git a/media_platform/kuaishou/login.py b/media_platform/kuaishou/login.py index 1564422..3552d80 100644 --- a/media_platform/kuaishou/login.py +++ b/media_platform/kuaishou/login.py @@ -1,11 +1,31 @@ -# -*- coding: utf-8 -*- +import asyncio +import functools +import sys +from typing import Optional +import redis +from playwright.async_api import BrowserContext, Page +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) + +import config from base.base_crawler import AbstractLogin +from tools import utils class KuaishouLogin(AbstractLogin): - def __init__(self): - pass + def __init__(self, + login_type: str, + browser_context: BrowserContext, + context_page: Page, + login_phone: Optional[str] = "", + cookie_str: str = "" + ): + self.login_type = login_type + self.browser_context = browser_context + self.context_page = context_page + self.login_phone = login_phone + self.cookie_str = cookie_str async def begin(self): pass From 512192a93ef7b489fa8b127fe4edcc3378891040 Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sat, 25 Nov 2023 00:02:33 +0800 Subject: [PATCH 3/5] =?UTF-8?q?feat:=20=E6=90=9C=E7=B4=A2=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E8=B0=83=E8=AF=95=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- media_platform/kuaishou/client.py | 45 +++++-- media_platform/kuaishou/core.py | 59 +++++++++- media_platform/kuaishou/graphql.py | 22 ++++ .../kuaishou/graphql/comment_list.graphql | 0 .../kuaishou/graphql/search_query.graphql | 111 ++++++++++++++++++ .../kuaishou/graphql/video_detail.graphql | 0 6 files changed, 222 insertions(+), 15 deletions(-) create mode 100644 media_platform/kuaishou/graphql.py create mode 100644 media_platform/kuaishou/graphql/comment_list.graphql create mode 100644 media_platform/kuaishou/graphql/search_query.graphql create mode 100644 media_platform/kuaishou/graphql/video_detail.graphql diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 9346fd4..9cfab0a 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -9,10 +9,11 @@ from playwright.async_api import BrowserContext, Page from tools import utils +from .graphql import KuaiShouGraphQL from .exception import DataFetchError, IPBlockError -class KuaishouClient: +class KuaiShouClient: def __init__( self, timeout=10, @@ -25,12 +26,17 @@ class KuaishouClient: self.proxies = proxies self.timeout = timeout self.headers = headers - self._host = "https://www.kuaishou.com" + self._host = "https://www.kuaishou.com/graphql" self.playwright_page = playwright_page self.cookie_dict = cookie_dict + self.graphql = KuaiShouGraphQL() async def _pre_headers(self, url: str, data=None): - pass + self.headers = { + "cookie":"kpf=PC_WEB; clientid=3; did=web_6e79b79fdeac627f5cf52f08cab4e6bd; kpn=KUAISHOU_VISION", + "content-type":"application/json" + } + return self.headers async def request(self, method, url, **kwargs) -> Dict: async with httpx.AsyncClient(proxies=self.proxies) as client: @@ -39,10 +45,10 @@ class KuaishouClient: **kwargs ) data: Dict = response.json() - if data["success"]: - return data.get("data", data.get("success", {})) + if data.get("errors"): + raise DataFetchError(data.get("errors", "unkonw error")) else: - raise DataFetchError(data.get("msg", None)) + return data.get("data", {}) async def get(self, uri: str, params=None) -> Dict: final_uri = uri @@ -53,10 +59,9 @@ class KuaishouClient: return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) async def post(self, uri: str, data: dict) -> Dict: - headers = await self._pre_headers(uri, data) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) return await self.request(method="POST", url=f"{self._host}{uri}", - data=json_str, headers=headers) + data=json_str, headers=self.headers) async def pong(self) -> bool: """get a note to check if login state is ok""" @@ -72,4 +77,26 @@ class KuaishouClient: async def update_cookies(self, browser_context: BrowserContext): cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) self.headers["Cookie"] = cookie_str - self.cookie_dict = cookie_dict \ No newline at end of file + self.cookie_dict = cookie_dict + + async def search_info_by_keyword(self, keyword: str, pcursor: str): + """ + KuaiShou Web Search API + :param keyword: search keyword + :param pcursor: limite page curson + :return: + """ + params = { + "operationName": "visionSearchPhoto", + "variables": { + "keyword": keyword, + "pcursor": pcursor, + "page": "search" + }, + "query": self.graphql.get("search_query") + } + return await self.post("", params) + + + async def get_video_info(self, video_id: str) -> Dict: + pass \ No newline at end of file diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index 67d60a6..b7afbbc 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -11,8 +11,9 @@ from base.proxy_account_pool import AccountPool from tools import utils from var import crawler_type_var -from .client import KuaishouClient +from .client import KuaiShouClient from .login import KuaishouLogin +from .exception import DataFetchError class KuaishouCrawler(AbstractCrawler): @@ -20,7 +21,7 @@ class KuaishouCrawler(AbstractCrawler): login_type: str crawler_type: str context_page: Page - ks_client: KuaishouClient + ks_client: KuaiShouClient account_pool: AccountPool browser_context: BrowserContext @@ -76,12 +77,58 @@ class KuaishouCrawler(AbstractCrawler): utils.logger.info("Kuaishou Crawler finished ...") async def search(self): - await asyncio.Event().wait() + utils.logger.info("Begin search kuaishou keywords") + ks_limit_count = 20 # kuaishou limit page fixed value + for keyword in config.KEYWORDS.split(","): + utils.logger.info(f"Current search keyword: {keyword}") + page = 1 + while page * ks_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + video_id_list: List[str] = [] + videos_res = await self.ks_client.search_info_by_keyword( + keyword=keyword, + pcursor=str(page), + ) + if not videos_res: + utils.logger.error(f"search info by keyword:{keyword} not found data") + continue + vision_search_photo = videos_res.get("visionSearchPhoto") + utils.logger.info(f"videos_res:{videos_res}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_video_detail(feed_item.get("photo", {}).get("id"), semaphore) + for feed_item in vision_search_photo.get("feeds", {}) + ] + video_details = await asyncio.gather(*task_list) + for video_detail in video_details: + if video_detail is not None: + video_id_list.append(video_detail.get("id")) + page += 1 + utils.logger.info(f"Video details: {video_details}") + await self.batch_get_note_comments(video_id_list) + await asyncio.Event().wait() async def get_specified_notes(self): pass + async def batch_get_note_comments(self, video_id_list: List[str]): + pass + + async def get_video_detail(self, ): + pass + + async def get_video_detail(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """Get note detail""" + async with semaphore: + try: + return await self.ks_client.get_video_info(video_id) + except DataFetchError as ex: + utils.logger.error(f"Get note detail error: {ex}") + return None + except KeyError as ex: + utils.logger.error(f"have not fund note detail note_id:{note_id}, err: {ex}") + return None + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: """Create proxy info for playwright and httpx""" # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 @@ -97,11 +144,11 @@ class KuaishouCrawler(AbstractCrawler): httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" return phone, playwright_proxy, httpx_proxy - async def create_ks_client(self, httpx_proxy: Optional[str]) -> KuaishouClient: + async def create_ks_client(self, httpx_proxy: Optional[str]) -> KuaiShouClient: """Create xhs client""" utils.logger.info("Begin create kuaishou API client ...") cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) - xhs_client_obj = KuaishouClient( + xhs_client_obj = KuaiShouClient( proxies=httpx_proxy, headers={ "User-Agent": self.user_agent, @@ -147,4 +194,4 @@ class KuaishouCrawler(AbstractCrawler): async def close(self): """Close browser context""" await self.browser_context.close() - utils.logger.info("Browser context closed ...") \ No newline at end of file + utils.logger.info("Browser context closed ...") diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py new file mode 100644 index 0000000..c73e18d --- /dev/null +++ b/media_platform/kuaishou/graphql.py @@ -0,0 +1,22 @@ +# 快手的数据传输是基于GraphQL实现的 +# 这个类负责获取一些GraphQL的schema +import os + + +class KuaiShouGraphQL: + graphql_queries = {} + + def __init__(self): + self.graphql_dir = "media_platform/kuaishou/graphql/" + self.load_graphql_queries() + + def load_graphql_queries(self): + graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql"] + + for file in graphql_files: + with open(self.graphql_dir + file, mode="r") as f: + query_name = file.split(".")[0] + self.graphql_queries[query_name] = f.read() + + def get(self, query_name: str) -> str: + return self.graphql_queries.get(query_name, "Query not found") diff --git a/media_platform/kuaishou/graphql/comment_list.graphql b/media_platform/kuaishou/graphql/comment_list.graphql new file mode 100644 index 0000000..e69de29 diff --git a/media_platform/kuaishou/graphql/search_query.graphql b/media_platform/kuaishou/graphql/search_query.graphql new file mode 100644 index 0000000..cc3bd8f --- /dev/null +++ b/media_platform/kuaishou/graphql/search_query.graphql @@ -0,0 +1,111 @@ +fragment photoContent on PhotoEntity { + __typename + id + duration + caption + originCaption + likeCount + viewCount + commentCount + realLikeCount + coverUrl + photoUrl + photoH265Url + manifest + manifestH265 + videoResource + coverUrls { + url + __typename + } + timestamp + expTag + animatedCoverUrl + distance + videoRatio + liked + stereoType + profileUserTopPhoto + musicBlocked +} + +fragment recoPhotoFragment on recoPhotoEntity { + __typename + id + duration + caption + originCaption + likeCount + viewCount + commentCount + realLikeCount + coverUrl + photoUrl + photoH265Url + manifest + manifestH265 + videoResource + coverUrls { + url + __typename + } + timestamp + expTag + animatedCoverUrl + distance + videoRatio + liked + stereoType + profileUserTopPhoto + musicBlocked +} + +fragment feedContent on Feed { + type + author { + id + name + headerUrl + following + headerUrls { + url + __typename + } + __typename + } + photo { + ...photoContent + ...recoPhotoFragment + __typename + } + canAddComment + llsid + status + currentPcursor + tags { + type + name + __typename + } + __typename +} + +query visionSearchPhoto($keyword: String, $pcursor: String, $searchSessionId: String, $page: String, $webPageArea: String) { + visionSearchPhoto(keyword: $keyword, pcursor: $pcursor, searchSessionId: $searchSessionId, page: $page, webPageArea: $webPageArea) { + result + llsid + webPageArea + feeds { + ...feedContent + __typename + } + searchSessionId + pcursor + aladdinBanner { + imgUrl + link + __typename + } + __typename + } +} diff --git a/media_platform/kuaishou/graphql/video_detail.graphql b/media_platform/kuaishou/graphql/video_detail.graphql new file mode 100644 index 0000000..e69de29 From bdf36ccb091c5099588dba8ef1987abe87a30dbb Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sun, 26 Nov 2023 01:05:52 +0800 Subject: [PATCH 4/5] =?UTF-8?q?feat:=20=E5=BF=AB=E6=89=8B=E5=85=B3?= =?UTF-8?q?=E9=94=AE=E8=AF=8D=E6=90=9C=E7=B4=A2=E5=AD=98=E5=82=A8CSV?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db.py | 1 + media_platform/kuaishou/client.py | 33 ++-- media_platform/kuaishou/core.py | 47 +++-- .../kuaishou/graphql/video_detail.graphql | 80 +++++++++ media_platform/kuaishou/login.py | 20 ++- models/kuaishou.py | 160 ++++++++++++++++++ 6 files changed, 300 insertions(+), 41 deletions(-) create mode 100644 models/kuaishou.py diff --git a/db.py b/db.py index f83205c..2d0863d 100644 --- a/db.py +++ b/db.py @@ -8,6 +8,7 @@ async def init_db(create_db: bool = False) -> None: await Tortoise.init( db_url=RELATION_DB_URL, modules={'models': ['models']}, + # modules={'models': ['models.kuaishou']}, # generate special table _create_db=create_db ) diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 9cfab0a..2c9f3c3 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -31,13 +31,6 @@ class KuaiShouClient: self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() - async def _pre_headers(self, url: str, data=None): - self.headers = { - "cookie":"kpf=PC_WEB; clientid=3; did=web_6e79b79fdeac627f5cf52f08cab4e6bd; kpn=KUAISHOU_VISION", - "content-type":"application/json" - } - return self.headers - async def request(self, method, url, **kwargs) -> Dict: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( @@ -55,8 +48,7 @@ class KuaiShouClient: if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") - headers = await self._pre_headers(final_uri) - return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) async def post(self, uri: str, data: dict) -> Dict: json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) @@ -81,12 +73,12 @@ class KuaiShouClient: async def search_info_by_keyword(self, keyword: str, pcursor: str): """ - KuaiShou Web Search API + KuaiShou web search api :param keyword: search keyword :param pcursor: limite page curson :return: """ - params = { + post_data = { "operationName": "visionSearchPhoto", "variables": { "keyword": keyword, @@ -95,8 +87,21 @@ class KuaiShouClient: }, "query": self.graphql.get("search_query") } - return await self.post("", params) + return await self.post("", post_data) - async def get_video_info(self, video_id: str) -> Dict: - pass \ No newline at end of file + async def get_video_info(self, photo_id: str) -> Dict: + """ + Kuaishou web video detail api + :param photo_id: + :return: + """ + post_data = { + "operationName": "visionVideoDetail", + "variables": { + "photoId": photo_id, + "page": "search" + }, + "query": self.graphql.get("video_detail") + } + return await self.post("", post_data) diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index b7afbbc..a2efd0f 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -10,6 +10,7 @@ from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool from tools import utils from var import crawler_type_var +from models import kuaishou from .client import KuaiShouClient from .login import KuaishouLogin @@ -91,42 +92,38 @@ class KuaishouCrawler(AbstractCrawler): if not videos_res: utils.logger.error(f"search info by keyword:{keyword} not found data") continue - vision_search_photo = videos_res.get("visionSearchPhoto") - utils.logger.info(f"videos_res:{videos_res}") - semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [ - self.get_video_detail(feed_item.get("photo", {}).get("id"), semaphore) - for feed_item in vision_search_photo.get("feeds", {}) - ] - video_details = await asyncio.gather(*task_list) - for video_detail in video_details: - if video_detail is not None: - video_id_list.append(video_detail.get("id")) - page += 1 - utils.logger.info(f"Video details: {video_details}") - await self.batch_get_note_comments(video_id_list) - await asyncio.Event().wait() + vision_search_photo: Dict = videos_res.get("visionSearchPhoto") + if vision_search_photo.get("result") != 1: + utils.logger.error(f"search info by keyword:{keyword} not found data ") + continue + + for video_detail in vision_search_photo.get("feeds"): + video_id_list.append(video_detail.get("photo", {}).get("id")) + await kuaishou.update_kuaishou_video(video_item=video_detail) + + # batch fetch video comments + page += 1 + await self.batch_get_video_comments(video_id_list) async def get_specified_notes(self): pass - async def batch_get_note_comments(self, video_id_list: List[str]): - pass + async def batch_get_video_comments(self, video_id_list: List[str]): + utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") - async def get_video_detail(self, ): - pass - - async def get_video_detail(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: - """Get note detail""" + async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """Get video detail task""" async with semaphore: try: - return await self.ks_client.get_video_info(video_id) + result = await self.ks_client.get_video_info(video_id) + utils.logger.info(f"Get video_id:{video_id} info result: {result} ...") + return result except DataFetchError as ex: - utils.logger.error(f"Get note detail error: {ex}") + utils.logger.error(f"Get video detail error: {ex}") return None except KeyError as ex: - utils.logger.error(f"have not fund note detail note_id:{note_id}, err: {ex}") + utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") return None def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: diff --git a/media_platform/kuaishou/graphql/video_detail.graphql b/media_platform/kuaishou/graphql/video_detail.graphql index e69de29..ffb5309 100644 --- a/media_platform/kuaishou/graphql/video_detail.graphql +++ b/media_platform/kuaishou/graphql/video_detail.graphql @@ -0,0 +1,80 @@ +query visionVideoDetail($photoId: String, $type: String, $page: String, $webPageArea: String) { + visionVideoDetail(photoId: $photoId, type: $type, page: $page, webPageArea: $webPageArea) { + status + type + author { + id + name + following + headerUrl + __typename + } + photo { + id + duration + caption + likeCount + realLikeCount + coverUrl + photoUrl + liked + timestamp + expTag + llsid + viewCount + videoRatio + stereoType + musicBlocked + manifest { + mediaType + businessType + version + adaptationSet { + id + duration + representation { + id + defaultSelect + backupUrl + codecs + url + height + width + avgBitrate + maxBitrate + m3u8Slice + qualityType + qualityLabel + frameRate + featureP2sp + hidden + disableAdaptive + __typename + } + __typename + } + __typename + } + manifestH265 + photoH265Url + coronaCropManifest + coronaCropManifestH265 + croppedPhotoH265Url + croppedPhotoUrl + videoResource + __typename + } + tags { + type + name + __typename + } + commentLimit { + canAddComment + __typename + } + llsid + danmakuSwitch + __typename + } +} diff --git a/media_platform/kuaishou/login.py b/media_platform/kuaishou/login.py index 3552d80..178225b 100644 --- a/media_platform/kuaishou/login.py +++ b/media_platform/kuaishou/login.py @@ -28,7 +28,16 @@ class KuaishouLogin(AbstractLogin): self.cookie_str = cookie_str async def begin(self): - pass + """Start login xiaohongshu""" + utils.logger.info("Begin login kuaishou ...") + if self.login_type == "qrcode": + await self.login_by_qrcode() + elif self.login_type == "phone": + await self.login_by_mobile() + elif self.login_type == "cookie": + await self.login_by_cookies() + else: + raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookie ...") async def login_by_qrcode(self): pass @@ -37,4 +46,11 @@ class KuaishouLogin(AbstractLogin): pass async def login_by_cookies(self): - pass + utils.logger.info("Begin login kuaishou by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".douyin.com", + 'path': "/" + }]) diff --git a/models/kuaishou.py b/models/kuaishou.py new file mode 100644 index 0000000..00b4c97 --- /dev/null +++ b/models/kuaishou.py @@ -0,0 +1,160 @@ +import csv +import pathlib +from typing import Dict, List + +from tortoise import fields +from tortoise.contrib.pydantic import pydantic_model_creator +from tortoise.models import Model + +import config +from tools import utils +from var import crawler_type_var + + +class KuaishouBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class KuaishouVideo(KuaishouBaseModel): + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + video_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + video_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") + video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") + video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") + + class Meta: + table = "kuaishou_video" + table_description = "快手视频" + + def __str__(self): + return f"{self.video_id} - {self.title}" + + +class KuaishouVideoComment(KuaishouBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "kuaishou_video_comment" + table_description = "快手视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" + + +async def update_kuaishou_video(video_item: Dict): + photo_info: Dict = video_item.get("photo", {}) + video_id = photo_info.get("id") + user_info = video_item.get("author", {}) + local_db_item = { + "video_id": video_id, + "video_type": video_item.get("type"), + "title": photo_info.get("caption", ""), + "desc": photo_info.get("caption", ""), + "create_time": photo_info.get("timestamp"), + "user_id": user_info.get("id"), + "nickname": user_info.get("name"), + "avatar": user_info.get("headerUrl", ""), + "liked_count": photo_info.get("realLikeCount"), + "viewd_count": photo_info.get("viewCount"), + "last_modify_ts": utils.get_current_timestamp(), + "video_url": f"https://www.kuaishou.com/short-video/{video_id}", + "video_cover_url": photo_info.get("coverUrl", ""), + "video_play_url": photo_info.get("photoUrl", ""), + } + print(f"Kuaishou video id:{video_id}, title:{local_db_item.get('title')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideo.filter(video_id=video_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoCreate', exclude=('id',)) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.create(**kuaishou_data.model_dump()) + else: + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoUpdate', + exclude=('id', 'add_ts')) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) + else: + # Below is a simple way to save it in CSV format. + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) + + +async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_ks_video_comment(video_id, comment_item) + + +async def update_ks_video_comment(video_id: str, comment_item: Dict): + comment_video_id = comment_item.get("video_id") + if video_id != comment_video_id: + print(f"comment_video_id: {comment_video_id} != video_id: {video_id}") + return + user_info = comment_item.get("user", {}) + comment_id = comment_item.get("cid") + avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( + "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} + local_db_item = { + "comment_id": comment_id, + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_label", ""), + "video_id": video_id, + "content": comment_item.get("text"), + "user_id": user_info.get("uid"), + "sec_uid": user_info.get("sec_uid"), + "short_user_id": user_info.get("short_id"), + "user_unique_id": user_info.get("unique_id"), + "user_signature": user_info.get("signature"), + "nickname": user_info.get("nickname"), + "avatar": avatar_info.get("url_list", [""])[0], + "sub_comment_count": comment_item.get("reply_comment_total", 0), + "last_modify_ts": utils.get_current_timestamp(), + } + print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.create(**comment_data.dict()) + else: + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.dict()) + else: + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) From dfb1788141667cf33afc833fc9950d043bc1297d Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sun, 26 Nov 2023 21:43:39 +0800 Subject: [PATCH 5/5] =?UTF-8?q?feat:=20=E5=BF=AB=E6=89=8B=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E8=AF=84=E8=AE=BA=E7=88=AC=E5=8F=96done=EF=BC=9B?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=BF=9D=E5=AD=98=E5=88=B0DB=E3=80=81CSV=20d?= =?UTF-8?q?one?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/base_config.py | 22 +++--- config/db_config.py | 2 +- media_platform/kuaishou/client.py | 75 ++++++++++++++++++- media_platform/kuaishou/core.py | 58 ++++++++++++-- media_platform/kuaishou/graphql.py | 4 +- .../kuaishou/graphql/comment_list.graphql | 39 ++++++++++ models/__init__.py | 2 +- models/kuaishou.py | 42 ++++------- var.py | 5 +- 9 files changed, 197 insertions(+), 52 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index de5567e..d6be4ed 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -1,42 +1,44 @@ -# Desc: base config +# 基础配置 PLATFORM = "xhs" KEYWORDS = "python,golang" LOGIN_TYPE = "qrcode" # qrcode or phone or cookie -COOKIES = "" # login by cookie, if login_type is cookie, you must set this value +COOKIES = "" CRAWLER_TYPE = "search" -# enable ip proxy +# 是否开启 IP 代理 ENABLE_IP_PROXY = False -# retry_interval +# 重试时间 RETRY_INTERVAL = 60 * 30 # 30 minutes # playwright headless HEADLESS = True -# save login state +# 是否保存登录状态 SAVE_LOGIN_STATE = True -# save user data dir +# 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name -# crawler max notes count +# 爬取视频/帖子的数量控制 CRAWLER_MAX_NOTES_COUNT = 20 -# max concurrency num +# 并发爬虫数量控制 MAX_CONCURRENCY_NUM = 10 -# xhs specified note id list +# 指定小红书需要爬虫的笔记ID列表 XHS_SPECIFIED_ID_LIST = [ "6422c2750000000027000d88", "64ca1b73000000000b028dd2", "630d5b85000000001203ab41", +# ........................ ] -# douyin specified note id list +# 指定抖音需要爬取的ID列表 DY_SPECIFIED_ID_LIST = [ "7280854932641664319", "7202432992642387233" +# ........................ ] \ No newline at end of file diff --git a/config/db_config.py b/config/db_config.py index 270edd3..1e9d267 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -9,4 +9,4 @@ RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db pas RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" # save data to database option -IS_SAVED_DATABASED = False # if you want to save data to database, set True +IS_SAVED_DATABASED = True # if you want to save data to database, set True diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 2c9f3c3..34f08ef 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -1,16 +1,16 @@ # -*- coding: utf-8 -*- import asyncio import json +from typing import Any, Callable, Dict, Optional from urllib.parse import urlencode -from typing import Dict, Optional import httpx from playwright.async_api import BrowserContext, Page from tools import utils -from .graphql import KuaiShouGraphQL from .exception import DataFetchError, IPBlockError +from .graphql import KuaiShouGraphQL class KuaiShouClient: @@ -31,7 +31,7 @@ class KuaiShouClient: self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() - async def request(self, method, url, **kwargs) -> Dict: + async def request(self, method, url, **kwargs) -> Any: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, @@ -89,7 +89,6 @@ class KuaiShouClient: } return await self.post("", post_data) - async def get_video_info(self, photo_id: str) -> Dict: """ Kuaishou web video detail api @@ -105,3 +104,71 @@ class KuaiShouClient: "query": self.graphql.get("video_detail") } return await self.post("", post_data) + + async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict: + """get video comments + :param photo_id: photo id you want to fetch + :param pcursor: last you get pcursor, defaults to "" + :return: + """ + post_data = { + "operationName": "commentListQuery", + "variables": { + "photoId": photo_id, + "pcursor": pcursor + }, + "query": self.graphql.get("comment_list") + + } + return await self.post("", post_data) + + async def get_video_sub_comments( + self, note_id: str, + root_comment_id: str, + num: int = 30, cursor: str = "" + ): + """ + get note sub comments + :param note_id: note id you want to fetch + :param root_comment_id: parent comment id + :param num: recommend 30, if num greater 30, it only return 30 comments + :param cursor: last you get cursor, defaults to "" + :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930} + """ + uri = "/api/sns/web/v2/comment/sub/page" + params = { + "note_id": note_id, + "root_comment_id": root_comment_id, + "num": num, + "cursor": cursor, + } + return await self.get(uri, params) + + async def get_video_all_comments(self, photo_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, + callback: Optional[Callable] = None, ): + """ + get video all comments include sub comments + :param photo_id: + :param crawl_interval: + :param is_fetch_sub_comments: + :param callback: + :return: + """ + + result = [] + pcursor = "" + while pcursor != "no_more": + comments_res = await self.get_video_comments(photo_id, pcursor) + vision_commen_list = comments_res.get("visionCommentList", {}) + pcursor = vision_commen_list.get("pcursor", "") + comments = vision_commen_list.get("rootComments", []) + + if callback: # 如果有回调函数,就执行回调函数 + await callback(photo_id, comments) + + await asyncio.sleep(crawl_interval) + if not is_fetch_sub_comments: + result.extend(comments) + continue + # todo handle get sub comments + return result diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index a2efd0f..2fd91ea 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -1,5 +1,8 @@ import asyncio import os +import random +import time +from asyncio import Task from typing import Dict, List, Optional, Tuple from playwright.async_api import (BrowserContext, BrowserType, Page, @@ -8,13 +11,13 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool -from tools import utils -from var import crawler_type_var from models import kuaishou +from tools import utils +from var import comment_tasks_var, crawler_type_var from .client import KuaiShouClient -from .login import KuaishouLogin from .exception import DataFetchError +from .login import KuaishouLogin class KuaishouCrawler(AbstractCrawler): @@ -109,9 +112,6 @@ class KuaishouCrawler(AbstractCrawler): async def get_specified_notes(self): pass - async def batch_get_video_comments(self, video_id_list: List[str]): - utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") - async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: """Get video detail task""" async with semaphore: @@ -126,6 +126,52 @@ class KuaishouCrawler(AbstractCrawler): utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") return None + async def batch_get_video_comments(self, video_id_list: List[str]): + """ + batch get video comments + :param video_id_list: + :return: + """ + utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for video_id in video_id_list: + task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id) + task_list.append(task) + + comment_tasks_var.set(task_list) + await asyncio.gather(*task_list) + + async def get_comments(self, video_id: str, semaphore: asyncio.Semaphore): + """ + get comment for video id + :param video_id: + :param semaphore: + :return: + """ + async with semaphore: + try: + await self.ks_client.get_video_all_comments( + photo_id=video_id, + crawl_interval=random.random(), + callback=kuaishou.batch_update_ks_video_comments + ) + except DataFetchError as ex: + utils.logger.error(f"Get video_id: {video_id} comment error: {ex}") + except Exception as e: + utils.logger.error(f"map by been blocked, err:", e) + # use time.sleeep block main coroutine instead of asyncio.sleep and cacel running comment task + # maybe kuaishou block our request, we will take a nap and update the cookie again + current_running_tasks = comment_tasks_var.get() + for task in current_running_tasks: + task.cancel() + time.sleep(20) + await self.context_page.goto(f"{self.index_url}?isHome=1") + await self.ks_client.update_cookies(browser_context=self.browser_context) + + + + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: """Create proxy info for playwright and httpx""" # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py index c73e18d..1b71917 100644 --- a/media_platform/kuaishou/graphql.py +++ b/media_platform/kuaishou/graphql.py @@ -1,10 +1,10 @@ # 快手的数据传输是基于GraphQL实现的 # 这个类负责获取一些GraphQL的schema -import os +from typing import Dict class KuaiShouGraphQL: - graphql_queries = {} + graphql_queries: Dict[str, str]= {} def __init__(self): self.graphql_dir = "media_platform/kuaishou/graphql/" diff --git a/media_platform/kuaishou/graphql/comment_list.graphql b/media_platform/kuaishou/graphql/comment_list.graphql index e69de29..b216b8e 100644 --- a/media_platform/kuaishou/graphql/comment_list.graphql +++ b/media_platform/kuaishou/graphql/comment_list.graphql @@ -0,0 +1,39 @@ +query commentListQuery($photoId: String, $pcursor: String) { + visionCommentList(photoId: $photoId, pcursor: $pcursor) { + commentCount + pcursor + rootComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + subCommentCount + subCommentsPcursor + subComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + replyToUserName + replyTo + __typename + } + __typename + } + __typename + } +} diff --git a/models/__init__.py b/models/__init__.py index d491f01..cb9970b 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,3 +1,3 @@ from .douyin import * +from .kuaishou import * from .xiaohongshu import * - diff --git a/models/kuaishou.py b/models/kuaishou.py index 00b4c97..70a9c4c 100644 --- a/models/kuaishou.py +++ b/models/kuaishou.py @@ -30,7 +30,7 @@ class KuaishouVideo(KuaishouBaseModel): desc = fields.TextField(null=True, description="视频描述") create_time = fields.BigIntField(description="视频发布时间戳", index=True) liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") - video_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") @@ -64,15 +64,15 @@ async def update_kuaishou_video(video_item: Dict): user_info = video_item.get("author", {}) local_db_item = { "video_id": video_id, - "video_type": video_item.get("type"), - "title": photo_info.get("caption", ""), - "desc": photo_info.get("caption", ""), + "video_type": str(video_item.get("type")), + "title": photo_info.get("caption", "")[:500], + "desc": photo_info.get("caption", "")[:500], "create_time": photo_info.get("timestamp"), "user_id": user_info.get("id"), "nickname": user_info.get("name"), "avatar": user_info.get("headerUrl", ""), - "liked_count": photo_info.get("realLikeCount"), - "viewd_count": photo_info.get("viewCount"), + "liked_count": str(photo_info.get("realLikeCount")), + "viewd_count": str(photo_info.get("viewCount")), "last_modify_ts": utils.get_current_timestamp(), "video_url": f"https://www.kuaishou.com/short-video/{video_id}", "video_cover_url": photo_info.get("coverUrl", ""), @@ -82,12 +82,12 @@ async def update_kuaishou_video(video_item: Dict): if config.IS_SAVED_DATABASED: if not await KuaishouVideo.filter(video_id=video_id).exists(): local_db_item["add_ts"] = utils.get_current_timestamp() - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoCreate', exclude=('id',)) + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_video_pydantic.model_validate(kuaishou_data) await KuaishouVideo.create(**kuaishou_data.model_dump()) else: - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoUpdate', + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', exclude=('id', 'add_ts')) kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_video_pydantic.model_validate(kuaishou_data) @@ -111,28 +111,16 @@ async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): async def update_ks_video_comment(video_id: str, comment_item: Dict): - comment_video_id = comment_item.get("video_id") - if video_id != comment_video_id: - print(f"comment_video_id: {comment_video_id} != video_id: {video_id}") - return - user_info = comment_item.get("user", {}) - comment_id = comment_item.get("cid") - avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( - "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} + comment_id = comment_item.get("commentId") local_db_item = { "comment_id": comment_id, - "create_time": comment_item.get("create_time"), - "ip_location": comment_item.get("ip_label", ""), + "create_time": comment_item.get("timestamp"), "video_id": video_id, - "content": comment_item.get("text"), - "user_id": user_info.get("uid"), - "sec_uid": user_info.get("sec_uid"), - "short_user_id": user_info.get("short_id"), - "user_unique_id": user_info.get("unique_id"), - "user_signature": user_info.get("signature"), - "nickname": user_info.get("nickname"), - "avatar": avatar_info.get("url_list", [""])[0], - "sub_comment_count": comment_item.get("reply_comment_total", 0), + "content": comment_item.get("content"), + "user_id": comment_item.get("authorId"), + "nickname": comment_item.get("authorName"), + "avatar": comment_item.get("headurl"), + "sub_comment_count": str(comment_item.get("subCommentCount", 0)), "last_modify_ts": utils.get_current_timestamp(), } print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") diff --git a/var.py b/var.py index e1b6368..e929f8e 100644 --- a/var.py +++ b/var.py @@ -1,4 +1,7 @@ +from asyncio.tasks import Task from contextvars import ContextVar +from typing import List request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="") -crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") \ No newline at end of file +crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") +comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])