feat: 快手视频评论爬取done;数据保存到DB、CSV done

This commit is contained in:
Relakkes 2023-11-26 21:43:39 +08:00
parent 2f8541a351
commit dfb1788141
9 changed files with 197 additions and 52 deletions

View File

@ -1,42 +1,44 @@
# Desc: base config # 基础配置
PLATFORM = "xhs" PLATFORM = "xhs"
KEYWORDS = "python,golang" KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = "" # login by cookie, if login_type is cookie, you must set this value COOKIES = ""
CRAWLER_TYPE = "search" CRAWLER_TYPE = "search"
# enable ip proxy # 是否开启 IP 代理
ENABLE_IP_PROXY = False ENABLE_IP_PROXY = False
# retry_interval # 重试时间
RETRY_INTERVAL = 60 * 30 # 30 minutes RETRY_INTERVAL = 60 * 30 # 30 minutes
# playwright headless # playwright headless
HEADLESS = True HEADLESS = True
# save login state # 是否保存登录状态
SAVE_LOGIN_STATE = True SAVE_LOGIN_STATE = True
# save user data dir # 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
# crawler max notes count # 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 20 CRAWLER_MAX_NOTES_COUNT = 20
# max concurrency num # 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 10 MAX_CONCURRENCY_NUM = 10
# xhs specified note id list # 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = [ XHS_SPECIFIED_ID_LIST = [
"6422c2750000000027000d88", "6422c2750000000027000d88",
"64ca1b73000000000b028dd2", "64ca1b73000000000b028dd2",
"630d5b85000000001203ab41", "630d5b85000000001203ab41",
# ........................
] ]
# douyin specified note id list # 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = [ DY_SPECIFIED_ID_LIST = [
"7280854932641664319", "7280854932641664319",
"7202432992642387233" "7202432992642387233"
# ........................
] ]

View File

@ -9,4 +9,4 @@ RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db pas
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"
# save data to database option # save data to database option
IS_SAVED_DATABASED = False # if you want to save data to database, set True IS_SAVED_DATABASED = True # if you want to save data to database, set True

View File

@ -1,16 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import asyncio import asyncio
import json import json
from typing import Any, Callable, Dict, Optional
from urllib.parse import urlencode from urllib.parse import urlencode
from typing import Dict, Optional
import httpx import httpx
from playwright.async_api import BrowserContext, Page from playwright.async_api import BrowserContext, Page
from tools import utils from tools import utils
from .graphql import KuaiShouGraphQL
from .exception import DataFetchError, IPBlockError from .exception import DataFetchError, IPBlockError
from .graphql import KuaiShouGraphQL
class KuaiShouClient: class KuaiShouClient:
@ -31,7 +31,7 @@ class KuaiShouClient:
self.cookie_dict = cookie_dict self.cookie_dict = cookie_dict
self.graphql = KuaiShouGraphQL() self.graphql = KuaiShouGraphQL()
async def request(self, method, url, **kwargs) -> Dict: async def request(self, method, url, **kwargs) -> Any:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request( response = await client.request(
method, url, timeout=self.timeout, method, url, timeout=self.timeout,
@ -89,7 +89,6 @@ class KuaiShouClient:
} }
return await self.post("", post_data) return await self.post("", post_data)
async def get_video_info(self, photo_id: str) -> Dict: async def get_video_info(self, photo_id: str) -> Dict:
""" """
Kuaishou web video detail api Kuaishou web video detail api
@ -105,3 +104,71 @@ class KuaiShouClient:
"query": self.graphql.get("video_detail") "query": self.graphql.get("video_detail")
} }
return await self.post("", post_data) return await self.post("", post_data)
async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict:
"""get video comments
:param photo_id: photo id you want to fetch
:param pcursor: last you get pcursor, defaults to ""
:return:
"""
post_data = {
"operationName": "commentListQuery",
"variables": {
"photoId": photo_id,
"pcursor": pcursor
},
"query": self.graphql.get("comment_list")
}
return await self.post("", post_data)
async def get_video_sub_comments(
self, note_id: str,
root_comment_id: str,
num: int = 30, cursor: str = ""
):
"""
get note sub comments
:param note_id: note id you want to fetch
:param root_comment_id: parent comment id
:param num: recommend 30, if num greater 30, it only return 30 comments
:param cursor: last you get cursor, defaults to ""
:return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930}
"""
uri = "/api/sns/web/v2/comment/sub/page"
params = {
"note_id": note_id,
"root_comment_id": root_comment_id,
"num": num,
"cursor": cursor,
}
return await self.get(uri, params)
async def get_video_all_comments(self, photo_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False,
callback: Optional[Callable] = None, ):
"""
get video all comments include sub comments
:param photo_id:
:param crawl_interval:
:param is_fetch_sub_comments:
:param callback:
:return:
"""
result = []
pcursor = ""
while pcursor != "no_more":
comments_res = await self.get_video_comments(photo_id, pcursor)
vision_commen_list = comments_res.get("visionCommentList", {})
pcursor = vision_commen_list.get("pcursor", "")
comments = vision_commen_list.get("rootComments", [])
if callback: # 如果有回调函数,就执行回调函数
await callback(photo_id, comments)
await asyncio.sleep(crawl_interval)
if not is_fetch_sub_comments:
result.extend(comments)
continue
# todo handle get sub comments
return result

View File

@ -1,5 +1,8 @@
import asyncio import asyncio
import os import os
import random
import time
from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page, from playwright.async_api import (BrowserContext, BrowserType, Page,
@ -8,13 +11,13 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool from base.proxy_account_pool import AccountPool
from tools import utils
from var import crawler_type_var
from models import kuaishou from models import kuaishou
from tools import utils
from var import comment_tasks_var, crawler_type_var
from .client import KuaiShouClient from .client import KuaiShouClient
from .login import KuaishouLogin
from .exception import DataFetchError from .exception import DataFetchError
from .login import KuaishouLogin
class KuaishouCrawler(AbstractCrawler): class KuaishouCrawler(AbstractCrawler):
@ -109,9 +112,6 @@ class KuaishouCrawler(AbstractCrawler):
async def get_specified_notes(self): async def get_specified_notes(self):
pass pass
async def batch_get_video_comments(self, video_id_list: List[str]):
utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}")
async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
"""Get video detail task""" """Get video detail task"""
async with semaphore: async with semaphore:
@ -126,6 +126,52 @@ class KuaishouCrawler(AbstractCrawler):
utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}")
return None return None
async def batch_get_video_comments(self, video_id_list: List[str]):
"""
batch get video comments
:param video_id_list:
:return:
"""
utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for video_id in video_id_list:
task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id)
task_list.append(task)
comment_tasks_var.set(task_list)
await asyncio.gather(*task_list)
async def get_comments(self, video_id: str, semaphore: asyncio.Semaphore):
"""
get comment for video id
:param video_id:
:param semaphore:
:return:
"""
async with semaphore:
try:
await self.ks_client.get_video_all_comments(
photo_id=video_id,
crawl_interval=random.random(),
callback=kuaishou.batch_update_ks_video_comments
)
except DataFetchError as ex:
utils.logger.error(f"Get video_id: {video_id} comment error: {ex}")
except Exception as e:
utils.logger.error(f"map by been blocked, err:", e)
# use time.sleeep block main coroutine instead of asyncio.sleep and cacel running comment task
# maybe kuaishou block our request, we will take a nap and update the cookie again
current_running_tasks = comment_tasks_var.get()
for task in current_running_tasks:
task.cancel()
time.sleep(20)
await self.context_page.goto(f"{self.index_url}?isHome=1")
await self.ks_client.update_cookies(browser_context=self.browser_context)
def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]:
"""Create proxy info for playwright and httpx""" """Create proxy info for playwright and httpx"""
# phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888

View File

@ -1,10 +1,10 @@
# 快手的数据传输是基于GraphQL实现的 # 快手的数据传输是基于GraphQL实现的
# 这个类负责获取一些GraphQL的schema # 这个类负责获取一些GraphQL的schema
import os from typing import Dict
class KuaiShouGraphQL: class KuaiShouGraphQL:
graphql_queries = {} graphql_queries: Dict[str, str]= {}
def __init__(self): def __init__(self):
self.graphql_dir = "media_platform/kuaishou/graphql/" self.graphql_dir = "media_platform/kuaishou/graphql/"

View File

@ -0,0 +1,39 @@
query commentListQuery($photoId: String, $pcursor: String) {
visionCommentList(photoId: $photoId, pcursor: $pcursor) {
commentCount
pcursor
rootComments {
commentId
authorId
authorName
content
headurl
timestamp
likedCount
realLikedCount
liked
status
authorLiked
subCommentCount
subCommentsPcursor
subComments {
commentId
authorId
authorName
content
headurl
timestamp
likedCount
realLikedCount
liked
status
authorLiked
replyToUserName
replyTo
__typename
}
__typename
}
__typename
}
}

View File

@ -1,3 +1,3 @@
from .douyin import * from .douyin import *
from .kuaishou import *
from .xiaohongshu import * from .xiaohongshu import *

View File

@ -30,7 +30,7 @@ class KuaishouVideo(KuaishouBaseModel):
desc = fields.TextField(null=True, description="视频描述") desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True) create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
video_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量")
video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") video_url = fields.CharField(null=True, max_length=512, description="视频详情URL")
video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL")
video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL")
@ -64,15 +64,15 @@ async def update_kuaishou_video(video_item: Dict):
user_info = video_item.get("author", {}) user_info = video_item.get("author", {})
local_db_item = { local_db_item = {
"video_id": video_id, "video_id": video_id,
"video_type": video_item.get("type"), "video_type": str(video_item.get("type")),
"title": photo_info.get("caption", ""), "title": photo_info.get("caption", "")[:500],
"desc": photo_info.get("caption", ""), "desc": photo_info.get("caption", "")[:500],
"create_time": photo_info.get("timestamp"), "create_time": photo_info.get("timestamp"),
"user_id": user_info.get("id"), "user_id": user_info.get("id"),
"nickname": user_info.get("name"), "nickname": user_info.get("name"),
"avatar": user_info.get("headerUrl", ""), "avatar": user_info.get("headerUrl", ""),
"liked_count": photo_info.get("realLikeCount"), "liked_count": str(photo_info.get("realLikeCount")),
"viewd_count": photo_info.get("viewCount"), "viewd_count": str(photo_info.get("viewCount")),
"last_modify_ts": utils.get_current_timestamp(), "last_modify_ts": utils.get_current_timestamp(),
"video_url": f"https://www.kuaishou.com/short-video/{video_id}", "video_url": f"https://www.kuaishou.com/short-video/{video_id}",
"video_cover_url": photo_info.get("coverUrl", ""), "video_cover_url": photo_info.get("coverUrl", ""),
@ -82,12 +82,12 @@ async def update_kuaishou_video(video_item: Dict):
if config.IS_SAVED_DATABASED: if config.IS_SAVED_DATABASED:
if not await KuaishouVideo.filter(video_id=video_id).exists(): if not await KuaishouVideo.filter(video_id=video_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp() local_db_item["add_ts"] = utils.get_current_timestamp()
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoCreate', exclude=('id',)) kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',))
kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_data = kuaishou_video_pydantic(**local_db_item)
kuaishou_video_pydantic.model_validate(kuaishou_data) kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.create(**kuaishou_data.model_dump()) await KuaishouVideo.create(**kuaishou_data.model_dump())
else: else:
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoUpdate', kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate',
exclude=('id', 'add_ts')) exclude=('id', 'add_ts'))
kuaishou_data = kuaishou_video_pydantic(**local_db_item) kuaishou_data = kuaishou_video_pydantic(**local_db_item)
kuaishou_video_pydantic.model_validate(kuaishou_data) kuaishou_video_pydantic.model_validate(kuaishou_data)
@ -111,28 +111,16 @@ async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]):
async def update_ks_video_comment(video_id: str, comment_item: Dict): async def update_ks_video_comment(video_id: str, comment_item: Dict):
comment_video_id = comment_item.get("video_id") comment_id = comment_item.get("commentId")
if video_id != comment_video_id:
print(f"comment_video_id: {comment_video_id} != video_id: {video_id}")
return
user_info = comment_item.get("user", {})
comment_id = comment_item.get("cid")
avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get(
"avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {}
local_db_item = { local_db_item = {
"comment_id": comment_id, "comment_id": comment_id,
"create_time": comment_item.get("create_time"), "create_time": comment_item.get("timestamp"),
"ip_location": comment_item.get("ip_label", ""),
"video_id": video_id, "video_id": video_id,
"content": comment_item.get("text"), "content": comment_item.get("content"),
"user_id": user_info.get("uid"), "user_id": comment_item.get("authorId"),
"sec_uid": user_info.get("sec_uid"), "nickname": comment_item.get("authorName"),
"short_user_id": user_info.get("short_id"), "avatar": comment_item.get("headurl"),
"user_unique_id": user_info.get("unique_id"), "sub_comment_count": str(comment_item.get("subCommentCount", 0)),
"user_signature": user_info.get("signature"),
"nickname": user_info.get("nickname"),
"avatar": avatar_info.get("url_list", [""])[0],
"sub_comment_count": comment_item.get("reply_comment_total", 0),
"last_modify_ts": utils.get_current_timestamp(), "last_modify_ts": utils.get_current_timestamp(),
} }
print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}")

5
var.py
View File

@ -1,4 +1,7 @@
from asyncio.tasks import Task
from contextvars import ContextVar from contextvars import ContextVar
from typing import List
request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="") request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")
crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="")
comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])