xhs_crawler/media_platform/douyin/client.py

305 lines
12 KiB
Python
Raw Normal View History

2023-06-25 13:05:30 +00:00
import asyncio
2023-07-29 07:35:40 +00:00
import copy
2024-06-16 16:25:48 +00:00
import json
2023-07-29 07:35:40 +00:00
import urllib.parse
2024-07-13 19:20:05 +00:00
from typing import Any, Callable, Dict, Optional
2024-07-13 19:20:05 +00:00
import requests
from playwright.async_api import BrowserContext
2024-04-13 12:18:04 +00:00
from base.base_crawler import AbstractApiClient
from tools import utils
2023-08-16 11:49:41 +00:00
from var import request_keyword_var
2023-06-25 13:05:30 +00:00
2023-07-29 07:35:40 +00:00
from .exception import *
from .field import *
2024-07-13 19:20:05 +00:00
from .help import *
2023-07-29 07:35:40 +00:00
2024-04-13 12:18:04 +00:00
class DOUYINClient(AbstractApiClient):
def __init__(
self,
2023-06-25 13:05:30 +00:00
timeout=30,
proxies=None,
*,
headers: Dict,
playwright_page: Optional[Page],
cookie_dict: Dict
):
self.proxies = proxies
self.timeout = timeout
self.headers = headers
self._host = "https://www.douyin.com"
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict
2024-07-13 19:20:05 +00:00
async def __process_req_params(
self, params: Optional[Dict] = None, headers: Optional[Dict] = None,
request_method="GET"
):
2023-06-25 13:05:30 +00:00
if not params:
return
headers = headers or self.headers
2023-08-16 05:58:44 +00:00
local_storage: Dict = await self.playwright_page.evaluate("() => window.localStorage") # type: ignore
2023-06-25 13:05:30 +00:00
common_params = {
"device_platform": "webapp",
"aid": "6383",
"channel": "channel_pc_web",
2024-07-13 19:20:05 +00:00
"version_code": "190600",
"version_name": "19.6.0",
"update_version_code": "170400",
"pc_client_type": "1",
2023-06-25 13:05:30 +00:00
"cookie_enabled": "true",
"browser_language": "zh-CN",
2024-07-13 19:20:05 +00:00
"browser_platform": "MacIntel",
"browser_name": "Chrome",
"browser_version": "125.0.0.0",
2023-06-25 13:05:30 +00:00
"browser_online": "true",
2024-07-13 19:20:05 +00:00
"engine_name": "Blink",
"os_name": "Mac OS",
"os_version": "10.15.7",
"cpu_core_num": "8",
"device_memory": "8",
2023-06-25 13:05:30 +00:00
"engine_version": "109.0",
"platform": "PC",
2024-07-13 19:20:05 +00:00
"screen_width": "2560",
"screen_height": "1440",
'effective_type': '4g',
"round_trip_time": "50",
"webid": get_web_id(),
"msToken": local_storage.get("xmst"),
2023-06-25 13:05:30 +00:00
}
params.update(common_params)
2024-07-13 19:20:05 +00:00
query_string = urllib.parse.urlencode(params)
# 20240610 a-bogus更新Playwright版本
post_data = {}
if request_method == "POST":
post_data = params
a_bogus = await get_a_bogus(query_string, post_data, headers["User-Agent"], self.playwright_page)
params["a_bogus"] = a_bogus
async def request(self, method, url, **kwargs):
2024-07-13 19:20:05 +00:00
response = None
if method == "GET":
response = requests.request(method, url, **kwargs)
elif method == "POST":
response = requests.request(method, url, **kwargs)
try:
if response.text == "" or response.text == "blocked":
utils.logger.error(f"request params incrr, response.text: {response.text}")
raise Exception("account blocked")
return response.json()
except Exception as e:
raise DataFetchError(f"{e}, {response.text}")
2023-06-25 13:05:30 +00:00
async def get(self, uri: str, params: Optional[Dict] = None, headers: Optional[Dict] = None):
2024-07-13 19:20:05 +00:00
"""
GET请求
"""
2023-06-25 13:05:30 +00:00
await self.__process_req_params(params, headers)
headers = headers or self.headers
return await self.request(method="GET", url=f"{self._host}{uri}", params=params, headers=headers)
async def post(self, uri: str, data: dict, headers: Optional[Dict] = None):
await self.__process_req_params(data, headers)
headers = headers or self.headers
return await self.request(method="POST", url=f"{self._host}{uri}", data=data, headers=headers)
async def pong(self, browser_context: BrowserContext) -> bool:
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
if local_storage.get("HasUserLogin", "") == "1":
return True
_, cookie_dict = utils.convert_cookies(await browser_context.cookies())
return cookie_dict.get("LOGIN_STATUS") == "1"
async def update_cookies(self, browser_context: BrowserContext):
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict
2023-06-25 13:05:30 +00:00
async def search_info_by_keyword(
self,
keyword: str,
offset: int = 0,
search_channel: SearchChannelType = SearchChannelType.GENERAL,
sort_type: SearchSortType = SearchSortType.GENERAL,
publish_time: PublishTimeType = PublishTimeType.UNLIMITED
):
"""
DouYin Web Search API
:param keyword:
:param offset:
:param search_channel:
:param sort_type:
:param publish_time: ·
:return:
"""
2024-07-13 19:20:05 +00:00
query_params = {
'search_channel': search_channel.value,
'enable_history': '1',
'keyword': urllib.parse.quote(keyword),
'search_source': 'tab_search',
'query_correct_type': '1',
'is_filter_search': '0',
'from_group_id': '7378810571505847586',
'offset': offset,
'count': '15',
'need_filter_settings': '1',
'list_type': 'multi',
2023-06-25 13:05:30 +00:00
}
2024-07-13 19:20:05 +00:00
if sort_type.value != SearchSortType.GENERAL.value or publish_time.value != PublishTimeType.UNLIMITED.value:
query_params["filter_selected"] = urllib.parse.quote(json.dumps({
"sort_type": str(sort_type.value),
"publish_time": str(publish_time.value)
}))
query_params["is_filter_search"] = 1
query_params["search_source"] = "tab_search"
referer_url = f"https://www.douyin.com/search/{keyword}?aid=f594bbd9-a0e2-4651-9319-ebe3cb6298c1&type=general"
2023-06-25 13:05:30 +00:00
headers = copy.copy(self.headers)
headers["Referer"] = urllib.parse.quote(referer_url, safe=':/')
2024-07-13 19:20:05 +00:00
return await self.get("/aweme/v1/web/general/search/single/", query_params, headers=headers)
2023-06-25 13:05:30 +00:00
async def get_video_by_id(self, aweme_id: str) -> Any:
2023-06-25 13:05:30 +00:00
"""
DouYin Video Detail API
:param aweme_id:
:return:
"""
params = {
"aweme_id": aweme_id
}
headers = copy.copy(self.headers)
del headers["Origin"]
res = await self.get("/aweme/v1/web/aweme/detail/", params, headers)
return res.get("aweme_detail", {})
2023-06-25 13:05:30 +00:00
2023-08-16 11:49:41 +00:00
async def get_aweme_comments(self, aweme_id: str, cursor: int = 0):
2023-06-25 13:05:30 +00:00
"""get note comments
2023-06-25 13:05:30 +00:00
"""
uri = "/aweme/v1/web/comment/list/"
params = {
"aweme_id": aweme_id,
"cursor": cursor,
"count": 20,
"item_type": 0
}
2023-08-16 11:49:41 +00:00
keywords = request_keyword_var.get()
2023-08-16 05:58:44 +00:00
referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general'
headers = copy.copy(self.headers)
headers["Referer"] = urllib.parse.quote(referer_url, safe=':/')
2023-06-25 13:05:30 +00:00
return await self.get(uri, params)
2024-05-28 22:35:37 +00:00
async def get_sub_comments(self, comment_id: str, cursor: int = 0):
"""
获取子评论
"""
uri = "/aweme/v1/web/comment/list/reply/"
params = {
'comment_id': comment_id,
"cursor": cursor,
"count": 20,
"item_type": 0,
}
keywords = request_keyword_var.get()
referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general'
headers = copy.copy(self.headers)
headers["Referer"] = urllib.parse.quote(referer_url, safe=':/')
return await self.get(uri, params)
2023-06-25 13:05:30 +00:00
async def get_aweme_all_comments(
self,
aweme_id: str,
crawl_interval: float = 1.0,
is_fetch_sub_comments=False,
2023-08-16 05:58:44 +00:00
callback: Optional[Callable] = None,
2023-06-25 13:05:30 +00:00
):
"""
获取帖子的所有评论包括子评论
:param aweme_id: 帖子ID
:param crawl_interval: 抓取间隔
:param is_fetch_sub_comments: 是否抓取子评论
:param callback: 回调函数用于处理抓取到的评论
:return: 评论列表
2023-06-25 13:05:30 +00:00
"""
result = []
comments_has_more = 1
comments_cursor = 0
while comments_has_more:
2023-08-16 11:49:41 +00:00
comments_res = await self.get_aweme_comments(aweme_id, comments_cursor)
2023-06-25 13:05:30 +00:00
comments_has_more = comments_res.get("has_more", 0)
comments_cursor = comments_res.get("cursor", 0)
comments = comments_res.get("comments", [])
2023-06-25 13:05:30 +00:00
if not comments:
continue
result.extend(comments)
2023-06-25 13:05:30 +00:00
if callback: # 如果有回调函数,就执行回调函数
await callback(aweme_id, comments)
2023-06-25 13:05:30 +00:00
await asyncio.sleep(crawl_interval)
if not is_fetch_sub_comments:
continue
2024-05-28 22:35:37 +00:00
# 获取二级评论
for comment in comments:
reply_comment_total = comment.get("reply_comment_total")
if reply_comment_total > 0:
comment_id = comment.get("cid")
sub_comments_has_more = 1
sub_comments_cursor = 0
while sub_comments_has_more:
sub_comments_res = await self.get_sub_comments(comment_id, sub_comments_cursor)
sub_comments_has_more = sub_comments_res.get("has_more", 0)
sub_comments_cursor = sub_comments_res.get("cursor", 0)
sub_comments = sub_comments_res.get("comments", [])
if not sub_comments:
continue
result.extend(sub_comments)
if callback: # 如果有回调函数,就执行回调函数
await callback(aweme_id, sub_comments)
await asyncio.sleep(crawl_interval)
2023-06-25 13:05:30 +00:00
return result
2024-05-27 17:07:19 +00:00
async def get_user_info(self, sec_user_id: str):
uri = "/aweme/v1/web/user/profile/other/"
params = {
"sec_user_id": sec_user_id,
"publish_video_strategy_type": 2,
"personal_center_strategy": 1,
}
return await self.get(uri, params)
async def get_user_aweme_posts(self, sec_user_id: str, max_cursor: str = "") -> Dict:
uri = "/aweme/v1/web/aweme/post/"
params = {
"sec_user_id": sec_user_id,
"count": 18,
"max_cursor": max_cursor,
"locate_query": "false",
2024-07-13 19:20:05 +00:00
"publish_video_strategy_type": 2,
'verifyFp': 'verify_lx901cuk_K7kaK4dK_bn2E_4dgk_BxAA_E0XS1VtUi130',
'fp': 'verify_lx901cuk_K7kaK4dK_bn2E_4dgk_BxAA_E0XS1VtUi130'
2024-05-27 17:07:19 +00:00
}
return await self.get(uri, params)
async def get_all_user_aweme_posts(self, sec_user_id: str, callback: Optional[Callable] = None):
posts_has_more = 1
max_cursor = ""
result = []
while posts_has_more == 1:
aweme_post_res = await self.get_user_aweme_posts(sec_user_id, max_cursor)
posts_has_more = aweme_post_res.get("has_more", 0)
max_cursor = aweme_post_res.get("max_cursor")
aweme_list = aweme_post_res.get("aweme_list") if aweme_post_res.get("aweme_list") else []
utils.logger.info(
f"[DOUYINClient.get_all_user_aweme_posts] got sec_user_id:{sec_user_id} video len : {len(aweme_list)}")
if callback:
await callback(aweme_list)
result.extend(aweme_list)
return result