feat: issue #14

refactor: 优化小红书crawler流程代码
This commit is contained in:
Relakkes 2023-07-15 17:11:53 +08:00
parent e5f4ecd8ec
commit dad8d56ab5
6 changed files with 138 additions and 77 deletions

2
.gitignore vendored
View File

@ -163,3 +163,5 @@ cython_debug/
*.iml *.iml
.idea .idea
/temp_image/ /temp_image/
/xhs_user_data_dir/
/dy_user_data_dir/

View File

@ -1,8 +1,7 @@
# Desc: base config
PLATFORM = "xhs" PLATFORM = "xhs"
KEYWORDS = "健身,旅游" KEYWORDS = "健身,旅游"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookies LOGIN_TYPE = "qrcode" # qrcode or phone or cookies
# If it's on the Xiaohongshu platform, only the web_session cookie will be kept.
# xhs cookie format -> web_session=040069b2acxxxxxxxxxxxxxxxxxxxx;
COOKIES = "" COOKIES = ""
# redis config # redis config
@ -17,3 +16,12 @@ RETRY_INTERVAL = 60 * 30 # 30 minutes
# playwright headless # playwright headless
HEADLESS = True HEADLESS = True
# save login state
SAVE_LOGIN_STATE = True
# save user data dir
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
# max page num
MAX_PAGE_NUM = 20

View File

@ -1,13 +1,16 @@
import json import json
import logging
import asyncio import asyncio
from typing import Optional, Dict from typing import Optional, Dict
import httpx import httpx
from playwright.async_api import Page from playwright.async_api import Page
from playwright.async_api import BrowserContext
from .help import sign, get_search_id from .help import sign, get_search_id
from .field import SearchSortType, SearchNoteType from .field import SearchSortType, SearchNoteType
from .exception import DataFetchError, IPBlockError from .exception import DataFetchError, IPBlockError
from tools import utils
class XHSClient: class XHSClient:
@ -77,6 +80,21 @@ class XHSClient:
return await self.request(method="POST", url=f"{self._host}{uri}", return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=headers) data=json_str, headers=headers)
async def ping(self) -> bool:
"""get a note to check if login state is ok"""
logging.info("begin to ping xhs...")
note_id = "5e5cb38a000000000100185e"
try:
note_card: Dict = await self.get_note_by_id(note_id)
return note_card.get("note_id") == note_id
except DataFetchError:
return False
async def update_cookies(self, browser_context: BrowserContext):
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_note_by_keyword( async def get_note_by_keyword(
self, keyword: str, self, keyword: str,
page: int = 1, page_size: int = 20, page: int = 1, page_size: int = 20,

View File

@ -6,7 +6,6 @@ from typing import Optional, List, Dict, Tuple
from argparse import Namespace from argparse import Namespace
from playwright.async_api import Page from playwright.async_api import Page
from playwright.async_api import Cookie
from playwright.async_api import BrowserContext from playwright.async_api import BrowserContext
from playwright.async_api import async_playwright from playwright.async_api import async_playwright
@ -21,8 +20,8 @@ from base.proxy_account_pool import AccountPool
class XiaoHongShuCrawler(AbstractCrawler): class XiaoHongShuCrawler(AbstractCrawler):
def __init__(self): def __init__(self):
self.cookies: Optional[List[Cookie]] = None # cookies from browser context
self.browser_context: Optional[BrowserContext] = None self.browser_context: Optional[BrowserContext] = None
self.context_page: Optional[Page] = None self.context_page: Optional[Page] = None
self.user_agent = utils.get_user_agent() self.user_agent = utils.get_user_agent()
@ -35,43 +34,25 @@ class XiaoHongShuCrawler(AbstractCrawler):
for key in kwargs.keys(): for key in kwargs.keys():
setattr(self, key, kwargs[key]) setattr(self, key, kwargs[key])
async def update_cookies(self):
self.cookies = await self.browser_context.cookies()
def create_proxy_info(self) -> Tuple[str, Dict, str]:
"""Create proxy info for playwright and httpx"""
# phone: 13012345671
# ip_proxy: 111.122.xx.xx1:8888
# 手机号和IP代理都是从账号池中获取的并且它们是固定绑定的
phone, ip_proxy = self.account_pool.get_account()
playwright_proxy = {
"server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}",
"username": config.IP_PROXY_USER,
"password": config.IP_PROXY_PASSWORD,
}
httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}"
return phone, playwright_proxy, httpx_proxy
async def start(self): async def start(self):
account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info()
if not config.ENABLE_IP_PROXY:
playwright_proxy, httpx_proxy = None, None
async with async_playwright() as playwright: async with async_playwright() as playwright:
# launch browser and create single browser context # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
browser = await chromium.launch(headless=config.HEADLESS, proxy=playwright_proxy) self.browser_context = await self.launch_browser(
self.browser_context = await browser.new_context( chromium,
viewport={"width": 1920, "height": 1080}, playwright_proxy,
user_agent=self.user_agent self.user_agent,
headless=config.HEADLESS
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler.
# execute JS to bypass anti automation/crawler detection
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page() self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url) await self.context_page.goto(self.index_url)
# begin login # Create a client to interact with the xiaohongshu website.
self.xhs_client = await self.create_xhs_client(httpx_proxy)
if not await self.xhs_client.ping():
login_obj = XHSLogin( login_obj = XHSLogin(
login_type=self.command_args.lt, login_type=self.command_args.lt,
login_phone=account_phone, login_phone=account_phone,
@ -80,42 +61,20 @@ class XiaoHongShuCrawler(AbstractCrawler):
cookie_str=config.COOKIES cookie_str=config.COOKIES
) )
await login_obj.begin() await login_obj.begin()
await self.xhs_client.update_cookies(browser_context=self.browser_context)
# update cookies
await self.update_cookies()
# init request client
cookie_str, cookie_dict = utils.convert_cookies(self.cookies)
self.xhs_client = XHSClient(
proxies=httpx_proxy,
headers={
"User-Agent": self.user_agent,
"Cookie": cookie_str,
"Origin": "https://www.xiaohongshu.com",
"Referer": "https://www.xiaohongshu.com",
"Content-Type": "application/json;charset=UTF-8"
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
)
# Search for notes and retrieve their comment information. # Search for notes and retrieve their comment information.
await self.search_posts() await self.search_posts()
# block main crawler coroutine logging.info("Xhs Crawler finished ...")
await asyncio.Event().wait()
async def close(self):
await self.browser_context.close()
await self.browser_context.close()
logging.info("Browser context closed ...")
async def search_posts(self): async def search_posts(self):
"""Search for notes and retrieve their comment information."""
logging.info("Begin search xiaohongshu keywords") logging.info("Begin search xiaohongshu keywords")
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
logging.info(f"Current keyword: {keyword}") logging.info(f"Current keyword: {keyword}")
note_list: List[str] = [] note_list: List[str] = []
max_note_len = 10 max_note_len = config.MAX_PAGE_NUM
page = 1 page = 1
while max_note_len > 0: while max_note_len > 0:
posts_res = await self.xhs_client.get_note_by_keyword( posts_res = await self.xhs_client.get_note_by_keyword(
@ -129,14 +88,16 @@ class XiaoHongShuCrawler(AbstractCrawler):
try: try:
note_detail = await self.xhs_client.get_note_by_id(note_id) note_detail = await self.xhs_client.get_note_by_id(note_id)
except DataFetchError as ex: except DataFetchError as ex:
logging.error(f"Get note detail error: {ex}")
continue continue
await xhs_model.update_xhs_note(note_detail) await xhs_model.update_xhs_note(note_detail)
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
note_list.append(note_id) note_list.append(note_id)
logging.info(f"keyword:{keyword}, note_list:{note_list}") logging.info(f"keyword:{keyword}, note_list:{note_list}")
await self.batch_get_note_comments(note_list) # await self.batch_get_note_comments(note_list)
async def batch_get_note_comments(self, note_list: List[str]): async def batch_get_note_comments(self, note_list: List[str]):
"""Batch get note comments"""
task_list: List[Task] = [] task_list: List[Task] = []
for note_id in note_list: for note_id in note_list:
task = asyncio.create_task(self.get_comments(note_id), name=note_id) task = asyncio.create_task(self.get_comments(note_id), name=note_id)
@ -144,7 +105,66 @@ class XiaoHongShuCrawler(AbstractCrawler):
await asyncio.wait(task_list) await asyncio.wait(task_list)
async def get_comments(self, note_id: str): async def get_comments(self, note_id: str):
"""Get note comments"""
logging.info(f"Begin get note id comments {note_id}") logging.info(f"Begin get note id comments {note_id}")
all_comments = await self.xhs_client.get_note_all_comments(note_id=note_id, crawl_interval=random.random()) all_comments = await self.xhs_client.get_note_all_comments(note_id=note_id, crawl_interval=random.random())
for comment in all_comments: for comment in all_comments:
await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment) await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment)
def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]:
"""Create proxy info for playwright and httpx"""
if not config.ENABLE_IP_PROXY:
return None, None, None
# phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888
phone, ip_proxy = self.account_pool.get_account()
playwright_proxy = {
"server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}",
"username": config.IP_PROXY_USER,
"password": config.IP_PROXY_PASSWORD,
}
httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}"
return phone, playwright_proxy, httpx_proxy
async def create_xhs_client(self, httpx_proxy: str) -> XHSClient:
"""Create xhs client"""
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
xhs_client_obj = XHSClient(
proxies=httpx_proxy,
headers={
"User-Agent": self.user_agent,
"Cookie": cookie_str,
"Origin": "https://www.xiaohongshu.com",
"Referer": "https://www.xiaohongshu.com",
"Content-Type": "application/json;charset=UTF-8"
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
)
return xhs_client_obj
async def launch_browser(self, chromium, playwright_proxy, user_agent, headless=True) -> BrowserContext:
"""Launch browser and create browser context"""
if config.SAVE_LOGIN_STATE:
# feat issue #14
browser_context = await chromium.launch_persistent_context(
user_data_dir=config.USER_DATA_DIR % self.command_args.platform,
accept_downloads=True,
headless=headless,
proxy=playwright_proxy,
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy)
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
async def close(self):
"""Close browser context"""
await self.browser_context.close()
logging.info("Browser context closed ...")

View File

@ -8,7 +8,8 @@ from tenacity import (
retry, retry,
stop_after_attempt, stop_after_attempt,
wait_fixed, wait_fixed,
retry_if_result retry_if_result,
RetryError
) )
from playwright.async_api import Page from playwright.async_api import Page
from playwright.async_api import BrowserContext from playwright.async_api import BrowserContext
@ -35,7 +36,11 @@ class XHSLogin(AbstractLogin):
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self, no_logged_in_session: str) -> bool: async def check_login_state(self, no_logged_in_session: str) -> bool:
"""Check if the current login status is successful and return True otherwise return False""" """
Check if the current login status is successful and return True otherwise return False
retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second
if max retry times reached, raise RetryError
"""
current_cookie = await self.browser_context.cookies() current_cookie = await self.browser_context.cookies()
_, cookie_dict = utils.convert_cookies(current_cookie) _, cookie_dict = utils.convert_cookies(current_cookie)
current_web_session = cookie_dict.get("web_session") current_web_session = cookie_dict.get("web_session")
@ -44,6 +49,8 @@ class XHSLogin(AbstractLogin):
return False return False
async def begin(self): async def begin(self):
"""Start login xiaohongshu"""
logging.info("Begin login xiaohongshu ...")
if self.login_type == "qrcode": if self.login_type == "qrcode":
await self.login_by_qrcode() await self.login_by_qrcode()
elif self.login_type == "phone": elif self.login_type == "phone":
@ -54,6 +61,7 @@ class XHSLogin(AbstractLogin):
raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookies ...") raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookies ...")
async def login_by_mobile(self): async def login_by_mobile(self):
"""Login xiaohongshu by mobile"""
logging.info("Begin login xiaohongshu by mobile ...") logging.info("Begin login xiaohongshu by mobile ...")
await asyncio.sleep(1) await asyncio.sleep(1)
try: try:
@ -108,9 +116,10 @@ class XHSLogin(AbstractLogin):
# todo ... 应该还需要检查验证码的正确性有可能输入的验证码不正确 # todo ... 应该还需要检查验证码的正确性有可能输入的验证码不正确
break break
login_flag: bool = await self.check_login_state(no_logged_in_session) try:
if not login_flag: await self.check_login_state(no_logged_in_session)
logging.info("login failed please confirm ...") except RetryError:
logging.info("Login xiaohongshu failed by mobile login method ...")
sys.exit() sys.exit()
wait_redirect_seconds = 5 wait_redirect_seconds = 5
@ -147,14 +156,17 @@ class XHSLogin(AbstractLogin):
no_logged_in_session = cookie_dict.get("web_session") no_logged_in_session = cookie_dict.get("web_session")
# show login qrcode # show login qrcode
# utils.show_qrcode(base64_qrcode_img) # fix issue #12
# we need to use partial function to call show_qrcode function and run in executor
# then current asyncio event loop will not be blocked
partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img) partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode) asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
logging.info(f"waiting for scan code login, remaining time is 20s") logging.info(f"waiting for scan code login, remaining time is 20s")
login_flag: bool = await self.check_login_state(no_logged_in_session) try:
if not login_flag: await self.check_login_state(no_logged_in_session)
logging.info("login failed please confirm ...") except RetryError:
logging.info("Login xiaohongshu failed by qrcode login method ...")
sys.exit() sys.exit()
wait_redirect_seconds = 5 wait_redirect_seconds = 5
@ -162,6 +174,7 @@ class XHSLogin(AbstractLogin):
await asyncio.sleep(wait_redirect_seconds) await asyncio.sleep(wait_redirect_seconds)
async def login_by_cookies(self): async def login_by_cookies(self):
"""login xiaohongshu website by cookies"""
logging.info("Begin login xiaohongshu by cookie ...") logging.info("Begin login xiaohongshu by cookie ...")
for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
await self.browser_context.add_cookies([{ await self.browser_context.add_cookies([{

View File

@ -46,7 +46,7 @@ class RecvSmsNotificationHandler(tornado.web.RequestHandler):
request_body = self.request.body.decode("utf-8") request_body = self.request.body.decode("utf-8")
req_body_dict = json.loads(request_body) req_body_dict = json.loads(request_body)
print("recv sms notification and body content: ", req_body_dict) print("recv sms notification and body content: ", req_body_dict)
redis_obj = aioredis.from_url(url=config.redis_db_host, password=config.redis_db_pwd, decode_responses=True) redis_obj = aioredis.from_url(url=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD, decode_responses=True)
sms_content = req_body_dict.get("sms_content") sms_content = req_body_dict.get("sms_content")
sms_code = extract_verification_code(sms_content) sms_code = extract_verification_code(sms_content)
if sms_code: if sms_code: