diff --git a/config/base_config.py b/config/base_config.py index de5567e..d6be4ed 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -1,42 +1,44 @@ -# Desc: base config +# 基础配置 PLATFORM = "xhs" KEYWORDS = "python,golang" LOGIN_TYPE = "qrcode" # qrcode or phone or cookie -COOKIES = "" # login by cookie, if login_type is cookie, you must set this value +COOKIES = "" CRAWLER_TYPE = "search" -# enable ip proxy +# 是否开启 IP 代理 ENABLE_IP_PROXY = False -# retry_interval +# 重试时间 RETRY_INTERVAL = 60 * 30 # 30 minutes # playwright headless HEADLESS = True -# save login state +# 是否保存登录状态 SAVE_LOGIN_STATE = True -# save user data dir +# 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name -# crawler max notes count +# 爬取视频/帖子的数量控制 CRAWLER_MAX_NOTES_COUNT = 20 -# max concurrency num +# 并发爬虫数量控制 MAX_CONCURRENCY_NUM = 10 -# xhs specified note id list +# 指定小红书需要爬虫的笔记ID列表 XHS_SPECIFIED_ID_LIST = [ "6422c2750000000027000d88", "64ca1b73000000000b028dd2", "630d5b85000000001203ab41", +# ........................ ] -# douyin specified note id list +# 指定抖音需要爬取的ID列表 DY_SPECIFIED_ID_LIST = [ "7280854932641664319", "7202432992642387233" +# ........................ ] \ No newline at end of file diff --git a/config/db_config.py b/config/db_config.py index 270edd3..1e9d267 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -9,4 +9,4 @@ RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db pas RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" # save data to database option -IS_SAVED_DATABASED = False # if you want to save data to database, set True +IS_SAVED_DATABASED = True # if you want to save data to database, set True diff --git a/db.py b/db.py index f83205c..2d0863d 100644 --- a/db.py +++ b/db.py @@ -8,6 +8,7 @@ async def init_db(create_db: bool = False) -> None: await Tortoise.init( db_url=RELATION_DB_URL, modules={'models': ['models']}, + # modules={'models': ['models.kuaishou']}, # generate special table _create_db=create_db ) diff --git a/main.py b/main.py index dd3dec8..81ac53a 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import config import db from base import proxy_account_pool from media_platform.douyin import DouYinCrawler +from media_platform.kuaishou import KuaishouCrawler from media_platform.xhs import XiaoHongShuCrawler @@ -16,6 +17,8 @@ class CrawlerFactory: return XiaoHongShuCrawler() elif platform == "dy": return DouYinCrawler() + elif platform == "ks": + return KuaishouCrawler() else: raise ValueError("Invalid Media Platform Currently only supported xhs or dy ...") @@ -23,8 +26,8 @@ class CrawlerFactory: async def main(): # define command line params ... parser = argparse.ArgumentParser(description='Media crawler program.') - parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy)', - choices=["xhs", "dy"], default=config.PLATFORM) + parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks)', + choices=["xhs", "dy", "ks"], default=config.PLATFORM) parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) parser.add_argument('--type', type=str, help='crawler type (search | detail)', diff --git a/media_platform/kuaishou/__init__.py b/media_platform/kuaishou/__init__.py new file mode 100644 index 0000000..de877e0 --- /dev/null +++ b/media_platform/kuaishou/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from .core import KuaishouCrawler \ No newline at end of file diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py new file mode 100644 index 0000000..34f08ef --- /dev/null +++ b/media_platform/kuaishou/client.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +import asyncio +import json +from typing import Any, Callable, Dict, Optional +from urllib.parse import urlencode + +import httpx +from playwright.async_api import BrowserContext, Page + +from tools import utils + +from .exception import DataFetchError, IPBlockError +from .graphql import KuaiShouGraphQL + + +class KuaiShouClient: + def __init__( + self, + timeout=10, + proxies=None, + *, + headers: Dict[str, str], + playwright_page: Page, + cookie_dict: Dict[str, str], + ): + self.proxies = proxies + self.timeout = timeout + self.headers = headers + self._host = "https://www.kuaishou.com/graphql" + self.playwright_page = playwright_page + self.cookie_dict = cookie_dict + self.graphql = KuaiShouGraphQL() + + async def request(self, method, url, **kwargs) -> Any: + async with httpx.AsyncClient(proxies=self.proxies) as client: + response = await client.request( + method, url, timeout=self.timeout, + **kwargs + ) + data: Dict = response.json() + if data.get("errors"): + raise DataFetchError(data.get("errors", "unkonw error")) + else: + return data.get("data", {}) + + async def get(self, uri: str, params=None) -> Dict: + final_uri = uri + if isinstance(params, dict): + final_uri = (f"{uri}?" + f"{urlencode(params)}") + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) + + async def post(self, uri: str, data: dict) -> Dict: + json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + return await self.request(method="POST", url=f"{self._host}{uri}", + data=json_str, headers=self.headers) + + async def pong(self) -> bool: + """get a note to check if login state is ok""" + utils.logger.info("Begin pong kuaishou...") + ping_flag = False + try: + pass + except Exception as e: + utils.logger.error(f"Pong kuaishou failed: {e}, and try to login again...") + ping_flag = False + return ping_flag + + async def update_cookies(self, browser_context: BrowserContext): + cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) + self.headers["Cookie"] = cookie_str + self.cookie_dict = cookie_dict + + async def search_info_by_keyword(self, keyword: str, pcursor: str): + """ + KuaiShou web search api + :param keyword: search keyword + :param pcursor: limite page curson + :return: + """ + post_data = { + "operationName": "visionSearchPhoto", + "variables": { + "keyword": keyword, + "pcursor": pcursor, + "page": "search" + }, + "query": self.graphql.get("search_query") + } + return await self.post("", post_data) + + async def get_video_info(self, photo_id: str) -> Dict: + """ + Kuaishou web video detail api + :param photo_id: + :return: + """ + post_data = { + "operationName": "visionVideoDetail", + "variables": { + "photoId": photo_id, + "page": "search" + }, + "query": self.graphql.get("video_detail") + } + return await self.post("", post_data) + + async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict: + """get video comments + :param photo_id: photo id you want to fetch + :param pcursor: last you get pcursor, defaults to "" + :return: + """ + post_data = { + "operationName": "commentListQuery", + "variables": { + "photoId": photo_id, + "pcursor": pcursor + }, + "query": self.graphql.get("comment_list") + + } + return await self.post("", post_data) + + async def get_video_sub_comments( + self, note_id: str, + root_comment_id: str, + num: int = 30, cursor: str = "" + ): + """ + get note sub comments + :param note_id: note id you want to fetch + :param root_comment_id: parent comment id + :param num: recommend 30, if num greater 30, it only return 30 comments + :param cursor: last you get cursor, defaults to "" + :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930} + """ + uri = "/api/sns/web/v2/comment/sub/page" + params = { + "note_id": note_id, + "root_comment_id": root_comment_id, + "num": num, + "cursor": cursor, + } + return await self.get(uri, params) + + async def get_video_all_comments(self, photo_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, + callback: Optional[Callable] = None, ): + """ + get video all comments include sub comments + :param photo_id: + :param crawl_interval: + :param is_fetch_sub_comments: + :param callback: + :return: + """ + + result = [] + pcursor = "" + while pcursor != "no_more": + comments_res = await self.get_video_comments(photo_id, pcursor) + vision_commen_list = comments_res.get("visionCommentList", {}) + pcursor = vision_commen_list.get("pcursor", "") + comments = vision_commen_list.get("rootComments", []) + + if callback: # 如果有回调函数,就执行回调函数 + await callback(photo_id, comments) + + await asyncio.sleep(crawl_interval) + if not is_fetch_sub_comments: + result.extend(comments) + continue + # todo handle get sub comments + return result diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py new file mode 100644 index 0000000..2fd91ea --- /dev/null +++ b/media_platform/kuaishou/core.py @@ -0,0 +1,240 @@ +import asyncio +import os +import random +import time +from asyncio import Task +from typing import Dict, List, Optional, Tuple + +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) + +import config +from base.base_crawler import AbstractCrawler +from base.proxy_account_pool import AccountPool +from models import kuaishou +from tools import utils +from var import comment_tasks_var, crawler_type_var + +from .client import KuaiShouClient +from .exception import DataFetchError +from .login import KuaishouLogin + + +class KuaishouCrawler(AbstractCrawler): + platform: str + login_type: str + crawler_type: str + context_page: Page + ks_client: KuaiShouClient + account_pool: AccountPool + browser_context: BrowserContext + + def __init__(self): + self.index_url = "https://www.kuaishou.com" + self.user_agent = utils.get_user_agent() + + def init_config(self, platform: str, login_type: str, account_pool: AccountPool, crawler_type: str): + self.platform = platform + self.login_type = login_type + self.account_pool = account_pool + self.crawler_type = crawler_type + + async def start(self): + account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() + async with async_playwright() as playwright: + # Launch a browser context. + chromium = playwright.chromium + self.browser_context = await self.launch_browser( + chromium, + playwright_proxy, + self.user_agent, + headless=config.HEADLESS + ) + # stealth.min.js is a js script to prevent the website from detecting the crawler. + await self.browser_context.add_init_script(path="libs/stealth.min.js") + self.context_page = await self.browser_context.new_page() + await self.context_page.goto(f"{self.index_url}?isHome=1") + + # Create a client to interact with the kuaishou website. + self.ks_client = await self.create_ks_client(httpx_proxy) + if not await self.ks_client.pong(): + login_obj = KuaishouLogin( + login_type=self.login_type, + login_phone=account_phone, + browser_context=self.browser_context, + context_page=self.context_page, + cookie_str=config.COOKIES + ) + await login_obj.begin() + await self.ks_client.update_cookies(browser_context=self.browser_context) + + crawler_type_var.set(self.crawler_type) + if self.crawler_type == "search": + # Search for notes and retrieve their comment information. + await self.search() + elif self.crawler_type == "detail": + # Get the information and comments of the specified post + await self.get_specified_notes() + else: + pass + + utils.logger.info("Kuaishou Crawler finished ...") + + async def search(self): + utils.logger.info("Begin search kuaishou keywords") + ks_limit_count = 20 # kuaishou limit page fixed value + for keyword in config.KEYWORDS.split(","): + utils.logger.info(f"Current search keyword: {keyword}") + page = 1 + while page * ks_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + video_id_list: List[str] = [] + videos_res = await self.ks_client.search_info_by_keyword( + keyword=keyword, + pcursor=str(page), + ) + if not videos_res: + utils.logger.error(f"search info by keyword:{keyword} not found data") + continue + + vision_search_photo: Dict = videos_res.get("visionSearchPhoto") + if vision_search_photo.get("result") != 1: + utils.logger.error(f"search info by keyword:{keyword} not found data ") + continue + + for video_detail in vision_search_photo.get("feeds"): + video_id_list.append(video_detail.get("photo", {}).get("id")) + await kuaishou.update_kuaishou_video(video_item=video_detail) + + # batch fetch video comments + page += 1 + await self.batch_get_video_comments(video_id_list) + + async def get_specified_notes(self): + pass + + async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """Get video detail task""" + async with semaphore: + try: + result = await self.ks_client.get_video_info(video_id) + utils.logger.info(f"Get video_id:{video_id} info result: {result} ...") + return result + except DataFetchError as ex: + utils.logger.error(f"Get video detail error: {ex}") + return None + except KeyError as ex: + utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") + return None + + async def batch_get_video_comments(self, video_id_list: List[str]): + """ + batch get video comments + :param video_id_list: + :return: + """ + utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for video_id in video_id_list: + task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id) + task_list.append(task) + + comment_tasks_var.set(task_list) + await asyncio.gather(*task_list) + + async def get_comments(self, video_id: str, semaphore: asyncio.Semaphore): + """ + get comment for video id + :param video_id: + :param semaphore: + :return: + """ + async with semaphore: + try: + await self.ks_client.get_video_all_comments( + photo_id=video_id, + crawl_interval=random.random(), + callback=kuaishou.batch_update_ks_video_comments + ) + except DataFetchError as ex: + utils.logger.error(f"Get video_id: {video_id} comment error: {ex}") + except Exception as e: + utils.logger.error(f"map by been blocked, err:", e) + # use time.sleeep block main coroutine instead of asyncio.sleep and cacel running comment task + # maybe kuaishou block our request, we will take a nap and update the cookie again + current_running_tasks = comment_tasks_var.get() + for task in current_running_tasks: + task.cancel() + time.sleep(20) + await self.context_page.goto(f"{self.index_url}?isHome=1") + await self.ks_client.update_cookies(browser_context=self.browser_context) + + + + + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: + """Create proxy info for playwright and httpx""" + # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 + phone, ip_proxy = self.account_pool.get_account() + if not config.ENABLE_IP_PROXY: + return phone, None, None + utils.logger.info("Begin proxy info for playwright and httpx ...") + playwright_proxy = { + "server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}", + "username": config.IP_PROXY_USER, + "password": config.IP_PROXY_PASSWORD, + } + httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" + return phone, playwright_proxy, httpx_proxy + + async def create_ks_client(self, httpx_proxy: Optional[str]) -> KuaiShouClient: + """Create xhs client""" + utils.logger.info("Begin create kuaishou API client ...") + cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + xhs_client_obj = KuaiShouClient( + proxies=httpx_proxy, + headers={ + "User-Agent": self.user_agent, + "Cookie": cookie_str, + "Origin": self.index_url, + "Referer": self.index_url, + "Content-Type": "application/json;charset=UTF-8" + }, + playwright_page=self.context_page, + cookie_dict=cookie_dict, + ) + return xhs_client_obj + + async def launch_browser( + self, + chromium: BrowserType, + playwright_proxy: Optional[Dict], + user_agent: Optional[str], + headless: bool = True + ) -> BrowserContext: + """Launch browser and create browser context""" + utils.logger.info("Begin create browser context ...") + if config.SAVE_LOGIN_STATE: + user_data_dir = os.path.join(os.getcwd(), "browser_data", + config.USER_DATA_DIR % self.platform) # type: ignore + browser_context = await chromium.launch_persistent_context( + user_data_dir=user_data_dir, + accept_downloads=True, + headless=headless, + proxy=playwright_proxy, # type: ignore + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + else: + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + browser_context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + + async def close(self): + """Close browser context""" + await self.browser_context.close() + utils.logger.info("Browser context closed ...") diff --git a/media_platform/kuaishou/exception.py b/media_platform/kuaishou/exception.py new file mode 100644 index 0000000..1a8642e --- /dev/null +++ b/media_platform/kuaishou/exception.py @@ -0,0 +1,9 @@ +from httpx import RequestError + + +class DataFetchError(RequestError): + """something error when fetch""" + + +class IPBlockError(RequestError): + """fetch so fast that the server block us ip""" diff --git a/media_platform/kuaishou/field.py b/media_platform/kuaishou/field.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/media_platform/kuaishou/field.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py new file mode 100644 index 0000000..1b71917 --- /dev/null +++ b/media_platform/kuaishou/graphql.py @@ -0,0 +1,22 @@ +# 快手的数据传输是基于GraphQL实现的 +# 这个类负责获取一些GraphQL的schema +from typing import Dict + + +class KuaiShouGraphQL: + graphql_queries: Dict[str, str]= {} + + def __init__(self): + self.graphql_dir = "media_platform/kuaishou/graphql/" + self.load_graphql_queries() + + def load_graphql_queries(self): + graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql"] + + for file in graphql_files: + with open(self.graphql_dir + file, mode="r") as f: + query_name = file.split(".")[0] + self.graphql_queries[query_name] = f.read() + + def get(self, query_name: str) -> str: + return self.graphql_queries.get(query_name, "Query not found") diff --git a/media_platform/kuaishou/graphql/comment_list.graphql b/media_platform/kuaishou/graphql/comment_list.graphql new file mode 100644 index 0000000..b216b8e --- /dev/null +++ b/media_platform/kuaishou/graphql/comment_list.graphql @@ -0,0 +1,39 @@ +query commentListQuery($photoId: String, $pcursor: String) { + visionCommentList(photoId: $photoId, pcursor: $pcursor) { + commentCount + pcursor + rootComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + subCommentCount + subCommentsPcursor + subComments { + commentId + authorId + authorName + content + headurl + timestamp + likedCount + realLikedCount + liked + status + authorLiked + replyToUserName + replyTo + __typename + } + __typename + } + __typename + } +} diff --git a/media_platform/kuaishou/graphql/search_query.graphql b/media_platform/kuaishou/graphql/search_query.graphql new file mode 100644 index 0000000..cc3bd8f --- /dev/null +++ b/media_platform/kuaishou/graphql/search_query.graphql @@ -0,0 +1,111 @@ +fragment photoContent on PhotoEntity { + __typename + id + duration + caption + originCaption + likeCount + viewCount + commentCount + realLikeCount + coverUrl + photoUrl + photoH265Url + manifest + manifestH265 + videoResource + coverUrls { + url + __typename + } + timestamp + expTag + animatedCoverUrl + distance + videoRatio + liked + stereoType + profileUserTopPhoto + musicBlocked +} + +fragment recoPhotoFragment on recoPhotoEntity { + __typename + id + duration + caption + originCaption + likeCount + viewCount + commentCount + realLikeCount + coverUrl + photoUrl + photoH265Url + manifest + manifestH265 + videoResource + coverUrls { + url + __typename + } + timestamp + expTag + animatedCoverUrl + distance + videoRatio + liked + stereoType + profileUserTopPhoto + musicBlocked +} + +fragment feedContent on Feed { + type + author { + id + name + headerUrl + following + headerUrls { + url + __typename + } + __typename + } + photo { + ...photoContent + ...recoPhotoFragment + __typename + } + canAddComment + llsid + status + currentPcursor + tags { + type + name + __typename + } + __typename +} + +query visionSearchPhoto($keyword: String, $pcursor: String, $searchSessionId: String, $page: String, $webPageArea: String) { + visionSearchPhoto(keyword: $keyword, pcursor: $pcursor, searchSessionId: $searchSessionId, page: $page, webPageArea: $webPageArea) { + result + llsid + webPageArea + feeds { + ...feedContent + __typename + } + searchSessionId + pcursor + aladdinBanner { + imgUrl + link + __typename + } + __typename + } +} diff --git a/media_platform/kuaishou/graphql/video_detail.graphql b/media_platform/kuaishou/graphql/video_detail.graphql new file mode 100644 index 0000000..ffb5309 --- /dev/null +++ b/media_platform/kuaishou/graphql/video_detail.graphql @@ -0,0 +1,80 @@ +query visionVideoDetail($photoId: String, $type: String, $page: String, $webPageArea: String) { + visionVideoDetail(photoId: $photoId, type: $type, page: $page, webPageArea: $webPageArea) { + status + type + author { + id + name + following + headerUrl + __typename + } + photo { + id + duration + caption + likeCount + realLikeCount + coverUrl + photoUrl + liked + timestamp + expTag + llsid + viewCount + videoRatio + stereoType + musicBlocked + manifest { + mediaType + businessType + version + adaptationSet { + id + duration + representation { + id + defaultSelect + backupUrl + codecs + url + height + width + avgBitrate + maxBitrate + m3u8Slice + qualityType + qualityLabel + frameRate + featureP2sp + hidden + disableAdaptive + __typename + } + __typename + } + __typename + } + manifestH265 + photoH265Url + coronaCropManifest + coronaCropManifestH265 + croppedPhotoH265Url + croppedPhotoUrl + videoResource + __typename + } + tags { + type + name + __typename + } + commentLimit { + canAddComment + __typename + } + llsid + danmakuSwitch + __typename + } +} diff --git a/media_platform/kuaishou/login.py b/media_platform/kuaishou/login.py new file mode 100644 index 0000000..178225b --- /dev/null +++ b/media_platform/kuaishou/login.py @@ -0,0 +1,56 @@ +import asyncio +import functools +import sys +from typing import Optional + +import redis +from playwright.async_api import BrowserContext, Page +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) + +import config +from base.base_crawler import AbstractLogin +from tools import utils + + +class KuaishouLogin(AbstractLogin): + def __init__(self, + login_type: str, + browser_context: BrowserContext, + context_page: Page, + login_phone: Optional[str] = "", + cookie_str: str = "" + ): + self.login_type = login_type + self.browser_context = browser_context + self.context_page = context_page + self.login_phone = login_phone + self.cookie_str = cookie_str + + async def begin(self): + """Start login xiaohongshu""" + utils.logger.info("Begin login kuaishou ...") + if self.login_type == "qrcode": + await self.login_by_qrcode() + elif self.login_type == "phone": + await self.login_by_mobile() + elif self.login_type == "cookie": + await self.login_by_cookies() + else: + raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookie ...") + + async def login_by_qrcode(self): + pass + + async def login_by_mobile(self): + pass + + async def login_by_cookies(self): + utils.logger.info("Begin login kuaishou by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".douyin.com", + 'path': "/" + }]) diff --git a/models/__init__.py b/models/__init__.py index d491f01..cb9970b 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,3 +1,3 @@ from .douyin import * +from .kuaishou import * from .xiaohongshu import * - diff --git a/models/kuaishou.py b/models/kuaishou.py new file mode 100644 index 0000000..70a9c4c --- /dev/null +++ b/models/kuaishou.py @@ -0,0 +1,148 @@ +import csv +import pathlib +from typing import Dict, List + +from tortoise import fields +from tortoise.contrib.pydantic import pydantic_model_creator +from tortoise.models import Model + +import config +from tools import utils +from var import crawler_type_var + + +class KuaishouBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class KuaishouVideo(KuaishouBaseModel): + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + video_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") + video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") + video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") + + class Meta: + table = "kuaishou_video" + table_description = "快手视频" + + def __str__(self): + return f"{self.video_id} - {self.title}" + + +class KuaishouVideoComment(KuaishouBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "kuaishou_video_comment" + table_description = "快手视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" + + +async def update_kuaishou_video(video_item: Dict): + photo_info: Dict = video_item.get("photo", {}) + video_id = photo_info.get("id") + user_info = video_item.get("author", {}) + local_db_item = { + "video_id": video_id, + "video_type": str(video_item.get("type")), + "title": photo_info.get("caption", "")[:500], + "desc": photo_info.get("caption", "")[:500], + "create_time": photo_info.get("timestamp"), + "user_id": user_info.get("id"), + "nickname": user_info.get("name"), + "avatar": user_info.get("headerUrl", ""), + "liked_count": str(photo_info.get("realLikeCount")), + "viewd_count": str(photo_info.get("viewCount")), + "last_modify_ts": utils.get_current_timestamp(), + "video_url": f"https://www.kuaishou.com/short-video/{video_id}", + "video_cover_url": photo_info.get("coverUrl", ""), + "video_play_url": photo_info.get("photoUrl", ""), + } + print(f"Kuaishou video id:{video_id}, title:{local_db_item.get('title')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideo.filter(video_id=video_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.create(**kuaishou_data.model_dump()) + else: + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', + exclude=('id', 'add_ts')) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) + else: + # Below is a simple way to save it in CSV format. + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) + + +async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_ks_video_comment(video_id, comment_item) + + +async def update_ks_video_comment(video_id: str, comment_item: Dict): + comment_id = comment_item.get("commentId") + local_db_item = { + "comment_id": comment_id, + "create_time": comment_item.get("timestamp"), + "video_id": video_id, + "content": comment_item.get("content"), + "user_id": comment_item.get("authorId"), + "nickname": comment_item.get("authorName"), + "avatar": comment_item.get("headurl"), + "sub_comment_count": str(comment_item.get("subCommentCount", 0)), + "last_modify_ts": utils.get_current_timestamp(), + } + print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.create(**comment_data.dict()) + else: + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.dict()) + else: + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) diff --git a/var.py b/var.py index e1b6368..e929f8e 100644 --- a/var.py +++ b/var.py @@ -1,4 +1,7 @@ +from asyncio.tasks import Task from contextvars import ContextVar +from typing import List request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="") -crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") \ No newline at end of file +crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") +comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])