xhs_crawler/media_platform/zhihu/help.py

361 lines
12 KiB
Python

# -*- coding: utf-8 -*-
import json
from typing import Dict, List, Optional
from urllib.parse import parse_qs, urlparse
import execjs
from parsel import Selector
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from tools.crawler_util import extract_text_from_html
ZHIHU_SGIN_JS = None
def sign(url: str, cookies: str) -> Dict:
"""
zhihu sign algorithm
Args:
url: request url with query string
cookies: request cookies with d_c0 key
Returns:
"""
global ZHIHU_SGIN_JS
if not ZHIHU_SGIN_JS:
with open("libs/zhihu.js", "r") as f:
ZHIHU_SGIN_JS = execjs.compile(f.read())
return ZHIHU_SGIN_JS.call("get_sign", url, cookies)
class ZhihuExtractor:
def __init__(self):
pass
def extract_contents_from_search(self, json_data: Dict) -> List[ZhihuContent]:
"""
extract zhihu contents
Args:
json_data: zhihu json data
Returns:
"""
if not json_data:
return []
search_result: List[Dict] = json_data.get("data", [])
search_result = [s_item for s_item in search_result if s_item.get("type") in ['search_result', 'zvideo']]
return self._extract_content_list([sr_item.get("object") for sr_item in search_result if sr_item.get("object")])
def _extract_content_list(self, content_list: List[Dict]) -> List[ZhihuContent]:
"""
extract zhihu content list
Args:
content_list:
Returns:
"""
if not content_list:
return []
res: List[ZhihuContent] = []
for content in content_list:
if content.get("type") == zhihu_constant.ANSWER_NAME:
res.append(self._extract_answer_content(content))
elif content.get("type") == zhihu_constant.ARTICLE_NAME:
res.append(self._extract_article_content(content))
elif content.get("type") == zhihu_constant.VIDEO_NAME:
res.append(self._extract_zvideo_content(content))
else:
continue
return res
def _extract_answer_content(self, answer: Dict) -> ZhihuContent:
"""
extract zhihu answer content
Args:
answer: zhihu answer
Returns:
"""
res = ZhihuContent()
res.content_id = answer.get("id")
res.content_type = answer.get("type")
res.content_text = extract_text_from_html(answer.get("content", ""))
res.question_id = answer.get("question").get("id")
res.content_url = f"{zhihu_constant.ZHIHU_URL}/question/{res.question_id}/answer/{res.content_id}"
res.title = extract_text_from_html(answer.get("title", ""))
res.desc = extract_text_from_html(answer.get("description", "") or answer.get("excerpt", ""))
res.created_time = answer.get("created_time")
res.updated_time = answer.get("updated_time")
res.voteup_count = answer.get("voteup_count", 0)
res.comment_count = answer.get("comment_count", 0)
# extract author info
author_info = self._extract_content_or_comment_author(answer.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
def _extract_article_content(self, article: Dict) -> ZhihuContent:
"""
extract zhihu article content
Args:
article: zhihu article
Returns:
"""
res = ZhihuContent()
res.content_id = article.get("id")
res.content_type = article.get("type")
res.content_text = extract_text_from_html(article.get("content"))
res.content_url = f"{zhihu_constant.ZHIHU_URL}/p/{res.content_id}"
res.title = extract_text_from_html(article.get("title"))
res.desc = extract_text_from_html(article.get("excerpt"))
res.created_time = article.get("created_time", 0) or article.get("created", 0)
res.updated_time = article.get("updated_time", 0) or article.get("updated", 0)
res.voteup_count = article.get("voteup_count", 0)
res.comment_count = article.get("comment_count", 0)
# extract author info
author_info = self._extract_content_or_comment_author(article.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
def _extract_zvideo_content(self, zvideo: Dict) -> ZhihuContent:
"""
extract zhihu zvideo content
Args:
zvideo:
Returns:
"""
res = ZhihuContent()
if "video" in zvideo and isinstance(zvideo.get("video"), dict): # 说明是从创作者主页的视频列表接口来的
res.content_id = zvideo.get("video").get("video_id")
res.content_url = f"{zhihu_constant.ZHIHU_URL}/zvideo/{res.content_id}"
res.created_time = zvideo.get("published_at")
res.updated_time = zvideo.get("updated_at")
else:
res.content_id = zvideo.get("zvideo_id")
res.content_url = zvideo.get("video_url")
res.created_time = zvideo.get("created_at")
res.content_type = zvideo.get("type")
res.title = extract_text_from_html(zvideo.get("title"))
res.desc = extract_text_from_html(zvideo.get("description"))
res.voteup_count = zvideo.get("voteup_count")
res.comment_count = zvideo.get("comment_count")
# extract author info
author_info = self._extract_content_or_comment_author(zvideo.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
@staticmethod
def _extract_content_or_comment_author(author: Dict) -> ZhihuCreator:
"""
extract zhihu author
Args:
author:
Returns:
"""
res = ZhihuCreator()
if not author:
return res
if not author.get("id"):
author = author.get("member")
res.user_id = author.get("id")
res.user_link = f"{zhihu_constant.ZHIHU_URL}/people/{author.get('url_token')}"
res.user_nickname = author.get("name")
res.user_avatar = author.get("avatar_url")
res.url_token = author.get("url_token")
return res
def extract_comments(self, page_content: ZhihuContent, comments: List[Dict]) -> List[ZhihuComment]:
"""
extract zhihu comments
Args:
page_content: zhihu content object
comments: zhihu comments
Returns:
"""
if not comments:
return []
res: List[ZhihuComment] = []
for comment in comments:
if comment.get("type") != "comment":
continue
res.append(self._extract_comment(page_content, comment))
return res
def _extract_comment(self, page_content: ZhihuContent, comment: Dict) -> ZhihuComment:
"""
extract zhihu comment
Args:
page_content: comment with content object
comment: zhihu comment
Returns:
"""
res = ZhihuComment()
res.comment_id = str(comment.get("id", ""))
res.parent_comment_id = comment.get("reply_comment_id")
res.content = extract_text_from_html(comment.get("content"))
res.publish_time = comment.get("created_time")
res.ip_location = self._extract_comment_ip_location(comment.get("comment_tag", []))
res.sub_comment_count = comment.get("child_comment_count")
res.like_count = comment.get("like_count") if comment.get("like_count") else 0
res.dislike_count = comment.get("dislike_count") if comment.get("dislike_count") else 0
res.content_id = page_content.content_id
res.content_type = page_content.content_type
# extract author info
author_info = self._extract_content_or_comment_author(comment.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
@staticmethod
def _extract_comment_ip_location(comment_tags: List[Dict]) -> str:
"""
extract comment ip location
Args:
comment_tags:
Returns:
"""
if not comment_tags:
return ""
for ct in comment_tags:
if ct.get("type") == "ip_info":
return ct.get("text")
return ""
@staticmethod
def extract_offset(paging_info: Dict) -> str:
"""
extract offset
Args:
paging_info:
Returns:
"""
# https://www.zhihu.com/api/v4/comment_v5/zvideos/1424368906836807681/root_comment?limit=10&offset=456770961_10125996085_0&order_by=score
next_url = paging_info.get("next")
if not next_url:
return ""
parsed_url = urlparse(next_url)
query_params = parse_qs(parsed_url.query)
offset = query_params.get('offset', [""])[0]
return offset
@staticmethod
def _foramt_gender_text(gender: int) -> str:
"""
format gender text
Args:
gender:
Returns:
"""
if gender == 1:
return ""
elif gender == 0:
return ""
else:
return "未知"
def extract_creator(self, user_url_token: str, html_content: str) -> Optional[ZhihuCreator]:
"""
extract zhihu creator
Args:
user_url_token : zhihu creator url token
html_content: zhihu creator html content
Returns:
"""
if not html_content:
return None
js_init_data = Selector(text=html_content).xpath("//script[@id='js-initialData']/text()").get(default="").strip()
if not js_init_data:
return None
js_init_data_dict: Dict = json.loads(js_init_data)
users_info: Dict = js_init_data_dict.get("initialState", {}).get("entities", {}).get("users", {})
if not users_info:
return None
creator_info: Dict = users_info.get(user_url_token)
if not creator_info:
return None
res = ZhihuCreator()
res.user_id = creator_info.get("id")
res.user_link = f"{zhihu_constant.ZHIHU_URL}/people/{user_url_token}"
res.user_nickname = creator_info.get("name")
res.user_avatar = creator_info.get("avatarUrl")
res.url_token = creator_info.get("urlToken") or user_url_token
res.gender = self._foramt_gender_text(creator_info.get("gender"))
res.ip_location = creator_info.get("ipInfo")
res.follows = creator_info.get("followingCount")
res.fans = creator_info.get("followerCount")
res.anwser_count = creator_info.get("answerCount")
res.video_count = creator_info.get("zvideoCount")
res.question_count = creator_info.get("questionCount")
res.article_count = creator_info.get("articlesCount")
res.column_count = creator_info.get("columnsCount")
res.get_voteup_count = creator_info.get("voteupCount")
return res
def extract_content_list_from_creator(self, anwser_list: List[Dict]) -> List[ZhihuContent]:
"""
extract content list from creator
Args:
anwser_list:
Returns:
"""
if not anwser_list:
return []
return self._extract_content_list(anwser_list)