Merge pull request #112 from NanmiCoder/feature/stored_in_json

refactor: 重构数据存储代码,新增jSON
This commit is contained in:
relakkes 2024-01-14 22:45:00 +08:00 committed by GitHub
commit e490123fcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1690 additions and 864 deletions

View File

@ -76,6 +76,7 @@
### 数据保存
- 支持保存到关系型数据库Mysql、PgSQL等
- 支持保存到csv中data/目录下)
- 支持保存到json中data/目录下)
## 如何使用 IP 代理
➡️➡️➡️ [IP代理使用方法](docs/代理使用.md)

View File

@ -39,3 +39,13 @@ class AbstractLogin(ABC):
@abstractmethod
async def login_by_cookies(self):
pass
class AbstractStore(ABC):
@abstractmethod
async def store_content(self, content_item: Dict):
pass
@abstractmethod
async def store_comment(self, comment_item: Dict):
pass

View File

@ -20,6 +20,9 @@ HEADLESS = True
# 是否保存登录状态
SAVE_LOGIN_STATE = True
# 数据保存类型选项配置,支持三种类型csv、db、json
SAVE_DATA_OPTION = "json" # csv or db or json
# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
@ -54,7 +57,10 @@ DY_SPECIFIED_ID_LIST = [
]
# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = []
KS_SPECIFIED_ID_LIST = [
"3xf8enb8dbj6uig",
"3x6zz972bchmvqe"
]
# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = [

View File

@ -7,6 +7,3 @@ REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456") # your redis password
# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db password
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"
# save data to database option
IS_SAVED_DATABASED = False # if you want to save data to database, set True

10
db.py
View File

@ -1,14 +1,20 @@
from typing import List
from tortoise import Tortoise, run_async
from config.db_config import *
from tools import utils
def get_platform_models() -> List[str]:
models = ["store.xhs", "store.douyin", "store.bilibili", "store.kuaishou", "store.weibo"]
return models
async def init_db(create_db: bool = False) -> None:
await Tortoise.init(
db_url=RELATION_DB_URL,
modules={'models': ['models']},
# modules={'models': ['models.kuaishou']}, # generate special table
modules={'models': get_platform_models()},
_create_db=create_db
)

View File

@ -40,7 +40,7 @@ async def main():
choices=["search", "detail"], default=config.CRAWLER_TYPE)
# init db
if config.IS_SAVED_DATABASED:
if config.SAVE_DATA_OPTION == "db":
await db.init_db()
args = parser.parse_args()

View File

@ -6,19 +6,18 @@
import asyncio
import os
import random
import time
from asyncio import Task
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
import config
from base.base_crawler import AbstractCrawler
from models import bilibili
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import bilibili as bilibili_store
from tools import utils
from var import comment_tasks_var, crawler_type_var
from var import crawler_type_var
from .client import BilibiliClient
from .exception import DataFetchError
@ -88,7 +87,6 @@ class BilibiliCrawler(AbstractCrawler):
pass
utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def search(self):
"""
search bilibili video with keywords
@ -118,7 +116,7 @@ class BilibiliCrawler(AbstractCrawler):
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili.update_bilibili_video(video_item)
await bilibili_store.update_bilibili_video(video_item)
page += 1
await self.batch_get_video_comments(video_id_list)
@ -150,7 +148,7 @@ class BilibiliCrawler(AbstractCrawler):
await self.bili_client.get_video_all_comments(
video_id=video_id,
crawl_interval=random.random(),
callback=bilibili.batch_update_bilibili_video_comments
callback=bilibili_store.batch_update_bilibili_video_comments
)
except DataFetchError as ex:
@ -176,7 +174,7 @@ class BilibiliCrawler(AbstractCrawler):
video_aid: str = video_item_view.get("aid")
if video_aid:
video_aids_list.append(video_aid)
await bilibili.update_bilibili_video(video_detail)
await bilibili_store.update_bilibili_video(video_detail)
await self.batch_get_video_comments(video_aids_list)
async def get_video_info_task(self, aid: int, bvid: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@ -195,7 +193,8 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.error(f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}")
return None
except KeyError as ex:
utils.logger.error(f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}")
utils.logger.error(
f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}")
return None
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 18:44
# @Desc : bilibli登录实现
# @Desc : bilibli登录实现
import asyncio
import functools

View File

@ -8,8 +8,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import douyin
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import douyin as douyin_store
from tools import utils
from var import crawler_type_var
@ -99,7 +99,7 @@ class DouYinCrawler(AbstractCrawler):
except TypeError:
continue
aweme_list.append(aweme_info.get("aweme_id", ""))
await douyin.update_douyin_aweme(aweme_item=aweme_info)
await douyin_store.update_douyin_aweme(aweme_item=aweme_info)
utils.logger.info(f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}")
await self.batch_get_note_comments(aweme_list)
@ -112,7 +112,7 @@ class DouYinCrawler(AbstractCrawler):
aweme_details = await asyncio.gather(*task_list)
for aweme_detail in aweme_details:
if aweme_detail is not None:
await douyin.update_douyin_aweme(aweme_detail)
await douyin_store.update_douyin_aweme(aweme_detail)
await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any:
@ -146,7 +146,7 @@ class DouYinCrawler(AbstractCrawler):
keywords=config.COMMENT_KEYWORDS # 关键词列表
)
# 现在返回的 comments 已经是经过关键词筛选的
await douyin.batch_update_dy_aweme_comments(aweme_id, comments)
await douyin_store.batch_update_dy_aweme_comments(aweme_id, comments)
utils.logger.info(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ...")
except DataFetchError as e:
utils.logger.error(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}")

View File

@ -10,7 +10,7 @@ from playwright.async_api import BrowserContext, Page
import config
from tools import utils
from .exception import DataFetchError, IPBlockError
from .exception import DataFetchError
from .graphql import KuaiShouGraphQL
@ -56,13 +56,21 @@ class KuaiShouClient:
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=self.headers)
@staticmethod
async def pong() -> bool:
async def pong(self) -> bool:
"""get a note to check if login state is ok"""
utils.logger.info("[KuaiShouClient.pong] Begin pong kuaishou...")
ping_flag = False
try:
pass
post_data = {
"operationName": "visionProfileUserList",
"variables": {
"ftype": 1,
},
"query": self.graphql.get("vision_profile")
}
res = await self.post("", post_data)
if res.get("visionProfileUserList", {}).get("result") == 1:
ping_flag = True
except Exception as e:
utils.logger.error(f"[KuaiShouClient.pong] Pong kuaishou failed: {e}, and try to login again...")
ping_flag = False

View File

@ -10,8 +10,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import kuaishou
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import kuaishou as kuaishou_store
from tools import utils
from var import comment_tasks_var, crawler_type_var
@ -106,7 +106,7 @@ class KuaishouCrawler(AbstractCrawler):
for video_detail in vision_search_photo.get("feeds"):
video_id_list.append(video_detail.get("photo", {}).get("id"))
await kuaishou.update_kuaishou_video(video_item=video_detail)
await kuaishou_store.update_kuaishou_video(video_item=video_detail)
# batch fetch video comments
page += 1
@ -121,7 +121,7 @@ class KuaishouCrawler(AbstractCrawler):
video_details = await asyncio.gather(*task_list)
for video_detail in video_details:
if video_detail is not None:
await kuaishou.update_kuaishou_video(video_detail)
await kuaishou_store.update_kuaishou_video(video_detail)
await self.batch_get_video_comments(config.KS_SPECIFIED_ID_LIST)
async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@ -167,7 +167,7 @@ class KuaishouCrawler(AbstractCrawler):
await self.ks_client.get_video_all_comments(
photo_id=video_id,
crawl_interval=random.random(),
callback=kuaishou.batch_update_ks_video_comments
callback=kuaishou_store.batch_update_ks_video_comments
)
except DataFetchError as ex:
utils.logger.error(f"[KuaishouCrawler.get_comments] get video_id: {video_id} comment error: {ex}")

View File

@ -11,7 +11,7 @@ class KuaiShouGraphQL:
self.load_graphql_queries()
def load_graphql_queries(self):
graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql"]
graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql", "vision_profile.graphql"]
for file in graphql_files:
with open(self.graphql_dir + file, mode="r") as f:

View File

@ -0,0 +1,16 @@
query visionProfileUserList($pcursor: String, $ftype: Int) {
visionProfileUserList(pcursor: $pcursor, ftype: $ftype) {
result
fols {
user_name
headurl
user_text
isFollowing
user_id
__typename
}
hostName
pcursor
__typename
}
}

View File

@ -15,8 +15,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import weibo
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import weibo as weibo_store
from tools import utils
from var import crawler_type_var
@ -120,7 +120,7 @@ class WeiboCrawler(AbstractCrawler):
if note_item:
mblog: Dict = note_item.get("mblog")
note_id_list.append(mblog.get("id"))
await weibo.update_weibo_note(note_item)
await weibo_store.update_weibo_note(note_item)
page += 1
await self.batch_get_notes_comments(note_id_list)
@ -138,7 +138,7 @@ class WeiboCrawler(AbstractCrawler):
video_details = await asyncio.gather(*task_list)
for note_item in video_details:
if note_item:
await weibo.update_weibo_note(note_item)
await weibo_store.update_weibo_note(note_item)
await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST)
async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@ -184,33 +184,11 @@ class WeiboCrawler(AbstractCrawler):
async with semaphore:
try:
utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...")
# Read keyword and quantity from config
keywords = config.COMMENT_KEYWORDS
max_comments = config.MAX_COMMENTS_PER_POST
# Download comments
all_comments = await self.wb_client.get_note_all_comments(
await self.wb_client.get_note_all_comments(
note_id=note_id,
crawl_interval=random.randint(1,10), # 微博对API的限流比较严重所以延时提高一些
callback=weibo_store.batch_update_weibo_note_comments
)
# Filter comments by keyword
if keywords:
filtered_comments = [
comment for comment in all_comments if
any(keyword in comment["content"]["message"] for keyword in keywords)
]
else:
filtered_comments = all_comments
# Limit the number of comments
if max_comments > 0:
filtered_comments = filtered_comments[:max_comments]
# Update weibo note comments
await weibo.batch_update_weibo_note_comments(note_id, filtered_comments)
except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}")
except Exception as e:

View File

@ -9,8 +9,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config
from base.base_crawler import AbstractCrawler
from models import xiaohongshu as xhs_model
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import xhs as xhs_store
from tools import utils
from var import crawler_type_var
@ -112,7 +112,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_model.update_xhs_note(note_detail)
await xhs_store.update_xhs_note(note_detail)
note_id_list.append(note_detail.get("note_id"))
page += 1
utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
@ -127,7 +127,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_model.update_xhs_note(note_detail)
await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(config.XHS_SPECIFIED_ID_LIST)
async def get_note_detail(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
@ -174,7 +174,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
# 更新或保存过滤后的评论
for comment in filtered_comments:
await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment)
await xhs_store.update_xhs_note_comment(note_id=note_id, comment_item=comment)
@staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]:

View File

@ -1,5 +0,0 @@
from .bilibili import *
from .douyin import *
from .kuaishou import *
from .weibo import *
from .xiaohongshu import *

View File

@ -1,156 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/3 16:16
# @Desc : B 站的模型类
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class BilibiliBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class BilibiliVideo(BilibiliBaseModel):
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
video_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
video_play_count = fields.CharField(null=True, max_length=16, description="视频播放数量")
video_danmaku = fields.CharField(null=True, max_length=16, description="视频弹幕数量")
video_comment = fields.CharField(null=True, max_length=16, description="视频评论数量")
video_url = fields.CharField(null=True, max_length=512, description="视频详情URL")
video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL")
class Meta:
table = "bilibili_video"
table_description = "B站视频"
def __str__(self):
return f"{self.video_id} - {self.title}"
class BilibiliComment(BilibiliBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "bilibili_video_comment"
table_description = "B 站视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"
async def update_bilibili_video(video_item: Dict):
video_item_view: Dict = video_item.get("View")
video_user_info: Dict = video_item_view.get("owner")
video_item_stat: Dict = video_item_view.get("stat")
video_id = str(video_item_view.get("aid"))
local_db_item = {
"video_id": video_id,
"video_type": "video",
"title": video_item_view.get("title", "")[:500],
"desc": video_item_view.get("desc", "")[:500],
"create_time": video_item_view.get("pubdate"),
"user_id": str(video_user_info.get("mid")),
"nickname": video_user_info.get("name"),
"avatar": video_user_info.get("face", ""),
"liked_count": str(video_item_stat.get("like", "")),
"video_play_count": str(video_item_stat.get("view", "")),
"video_danmaku": str(video_item_stat.get("danmaku", "")),
"video_comment": str(video_item_stat.get("reply", "")),
"last_modify_ts": utils.get_current_timestamp(),
"video_url": f"https://www.bilibili.com/video/av{video_id}",
"video_cover_url": video_item_view.get("pic", ""),
}
utils.logger.info(f"[models.bilibili.update_bilibili_video] bilibili video id:{video_id}, title:{local_db_item.get('title')}")
if config.IS_SAVED_DATABASED:
if not await BilibiliVideo.filter(video_id=video_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',))
bilibili_data = bilibili_video_pydantic(**local_db_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.create(**bilibili_data.model_dump())
else:
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate',
exclude=('id', 'add_ts'))
bilibili_data = bilibili_video_pydantic(**local_db_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/bilibili").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/bilibili/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_bilibili_video_comment(video_id, comment_item)
async def update_bilibili_video_comment(video_id: str, comment_item: Dict):
comment_id = str(comment_item.get("rpid"))
content: Dict = comment_item.get("content")
user_info: Dict = comment_item.get("member")
local_db_item = {
"comment_id": comment_id,
"create_time": comment_item.get("ctime"),
"video_id": str(video_id),
"content": content.get("message"),
"user_id": user_info.get("mid"),
"nickname": user_info.get("uname"),
"avatar": user_info.get("avatar"),
"sub_comment_count": str(comment_item.get("rcount", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[models.bilibili.update_bilibili_video_comment] Bilibili video comment: {comment_id}, content: {local_db_item.get('content')}")
if config.IS_SAVED_DATABASED:
if not await BilibiliComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await BilibiliComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
pathlib.Path(f"data/bilibili").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/bilibili/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -1,171 +0,0 @@
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class DouyinBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
sec_uid = fields.CharField(null=True, max_length=128, description="用户sec_uid")
short_user_id = fields.CharField(null=True, max_length=64, description="用户短ID")
user_unique_id = fields.CharField(null=True, max_length=64, description="用户唯一ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
user_signature = fields.CharField(null=True, max_length=500, description="用户签名")
ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class DouyinAweme(DouyinBaseModel):
aweme_id = fields.CharField(max_length=64, index=True, description="视频ID")
aweme_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
comment_count = fields.CharField(null=True, max_length=16, description="视频评论数")
share_count = fields.CharField(null=True, max_length=16, description="视频分享数")
collected_count = fields.CharField(null=True, max_length=16, description="视频收藏数")
aweme_url = fields.CharField(null=True, max_length=255, description="视频详情页URL")
class Meta:
table = "douyin_aweme"
table_description = "抖音视频"
def __str__(self):
return f"{self.aweme_id} - {self.title}"
class DouyinAwemeComment(DouyinBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
aweme_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "douyin_aweme_comment"
table_description = "抖音视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"
async def update_douyin_aweme(aweme_item: Dict):
aweme_id = aweme_item.get("aweme_id")
user_info = aweme_item.get("author", {})
interact_info = aweme_item.get("statistics", {})
local_db_item = {
"aweme_id": aweme_id,
"aweme_type": str(aweme_item.get("aweme_type")),
"title": aweme_item.get("desc", ""),
"desc": aweme_item.get("desc", ""),
"create_time": aweme_item.get("create_time"),
"user_id": user_info.get("uid"),
"sec_uid": user_info.get("sec_uid"),
"short_user_id": user_info.get("short_id"),
"user_unique_id": user_info.get("unique_id"),
"user_signature": user_info.get("signature"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("avatar_thumb", {}).get("url_list", [""])[0],
"liked_count": str(interact_info.get("digg_count")),
"collected_count": str(interact_info.get("collect_count")),
"comment_count": str(interact_info.get("comment_count")),
"share_count": str(interact_info.get("share_count")),
"ip_location": aweme_item.get("ip_label", ""),
"last_modify_ts": utils.get_current_timestamp(),
"aweme_url": f"https://www.douyin.com/video/{aweme_id}"
}
utils.logger.info(f"[models.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{local_db_item.get('title')}")
if config.IS_SAVED_DATABASED:
if not await DouyinAweme.filter(aweme_id=aweme_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',))
douyin_data = douyin_aweme_pydantic(**local_db_item)
douyin_aweme_pydantic.validate(douyin_data)
await DouyinAweme.create(**douyin_data.dict())
else:
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate',
exclude=('id', 'add_ts'))
douyin_data = douyin_aweme_pydantic(**local_db_item)
douyin_aweme_pydantic.validate(douyin_data)
await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.dict())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/dy/{crawler_type_var.get()}_awemes_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_dy_aweme_comment(aweme_id, comment_item)
async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict):
comment_aweme_id = comment_item.get("aweme_id")
if aweme_id != comment_aweme_id:
utils.logger.error(f"[models.douyin.update_dy_aweme_comment] comment_aweme_id: {comment_aweme_id} != aweme_id: {aweme_id}")
return
user_info = comment_item.get("user", {})
comment_id = comment_item.get("cid")
avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get(
"avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {}
local_db_item = {
"comment_id": comment_id,
"create_time": comment_item.get("create_time"),
"ip_location": comment_item.get("ip_label", ""),
"aweme_id": aweme_id,
"content": comment_item.get("text"),
"user_id": user_info.get("uid"),
"sec_uid": user_info.get("sec_uid"),
"short_user_id": user_info.get("short_id"),
"user_unique_id": user_info.get("unique_id"),
"user_signature": user_info.get("signature"),
"nickname": user_info.get("nickname"),
"avatar": avatar_info.get("url_list", [""])[0],
"sub_comment_count": str(comment_item.get("reply_comment_total", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[models.douyin.update_dy_aweme_comment] douyin aweme comment: {comment_id}, content: {local_db_item.get('content')}")
if config.IS_SAVED_DATABASED:
if not await DouyinAwemeComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await DouyinAwemeComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/dy/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -1,151 +0,0 @@
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class KuaishouBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class KuaishouVideo(KuaishouBaseModel):
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
video_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量")
video_url = fields.CharField(null=True, max_length=512, description="视频详情URL")
video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL")
video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL")
class Meta:
table = "kuaishou_video"
table_description = "快手视频"
def __str__(self):
return f"{self.video_id} - {self.title}"
class KuaishouVideoComment(KuaishouBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "kuaishou_video_comment"
table_description = "快手视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"
async def update_kuaishou_video(video_item: Dict):
photo_info: Dict = video_item.get("photo", {})
video_id = photo_info.get("id")
if not video_id:
return
user_info = video_item.get("author", {})
local_db_item = {
"video_id": video_id,
"video_type": str(video_item.get("type")),
"title": photo_info.get("caption", "")[:500],
"desc": photo_info.get("caption", "")[:500],
"create_time": photo_info.get("timestamp"),
"user_id": user_info.get("id"),
"nickname": user_info.get("name"),
"avatar": user_info.get("headerUrl", ""),
"liked_count": str(photo_info.get("realLikeCount")),
"viewd_count": str(photo_info.get("viewCount")),
"last_modify_ts": utils.get_current_timestamp(),
"video_url": f"https://www.kuaishou.com/short-video/{video_id}",
"video_cover_url": photo_info.get("coverUrl", ""),
"video_play_url": photo_info.get("photoUrl", ""),
}
utils.logger.info(f"[models.kuaishou.update_kuaishou_video] Kuaishou video id:{video_id}, title:{local_db_item.get('title')}")
if config.IS_SAVED_DATABASED:
if not await KuaishouVideo.filter(video_id=video_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',))
kuaishou_data = kuaishou_video_pydantic(**local_db_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.create(**kuaishou_data.model_dump())
else:
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate',
exclude=('id', 'add_ts'))
kuaishou_data = kuaishou_video_pydantic(**local_db_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/kuaishou/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]):
utils.logger.info(f"[KuaishouVideoComment.batch_update_ks_video_comments] video_id:{video_id}, comments:{comments}")
if not comments:
return
for comment_item in comments:
await update_ks_video_comment(video_id, comment_item)
async def update_ks_video_comment(video_id: str, comment_item: Dict):
comment_id = comment_item.get("commentId")
local_db_item = {
"comment_id": comment_id,
"create_time": comment_item.get("timestamp"),
"video_id": video_id,
"content": comment_item.get("content"),
"user_id": comment_item.get("authorId"),
"nickname": comment_item.get("authorName"),
"avatar": comment_item.get("headurl"),
"sub_comment_count": str(comment_item.get("subCommentCount", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[models.kuaishou.update_ks_video_comment] Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}")
if config.IS_SAVED_DATABASED:
if not await KuaishouVideoComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await KuaishouVideoComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/kuaishou/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -1,170 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 21:53
# @Desc : 微博的模型类
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class WeiboBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
gender = fields.CharField(null=True, max_length=12, description="用户性别")
profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址")
ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class WeiboNote(WeiboBaseModel):
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="帖子正文内容")
create_time = fields.BigIntField(description="帖子发布时间戳", index=True)
create_date_time = fields.CharField(description="帖子发布日期时间",max_length=32, index=True)
liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数")
comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量")
shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量")
note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL")
class Meta:
table = "weibo_note"
table_description = "微博帖子"
def __str__(self):
return f"{self.note_id}"
class WeiboComment(WeiboBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
create_date_time = fields.CharField(description="评论日期时间", max_length=32, index=True)
comment_like_count = fields.CharField(max_length=16, description="评论点赞数量")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "weibo_note_comment"
table_description = "微博帖子评论"
def __str__(self):
return f"{self.comment_id}"
async def update_weibo_note(note_item: Dict):
mblog: Dict = note_item.get("mblog")
user_info: Dict = mblog.get("user")
note_id = mblog.get("id")
local_db_item = {
# 微博信息
"note_id": note_id,
"content": mblog.get("text"),
"create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(mblog.get("created_at"))),
"liked_count": str(mblog.get("attitudes_count", 0)),
"comments_count": str(mblog.get("comments_count", 0)),
"shared_count": str(mblog.get("reposts_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://m.weibo.cn/detail/{note_id}",
"ip_location": mblog.get("region_name", "").replace("发布于 ", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_note] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboNote.filter(note_id=note_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_note_pydantic(**local_db_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.create(**weibo_data.model_dump())
else:
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
exclude=('id', 'add_ts'))
weibo_data = weibo_note_pydantic(**local_db_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/weibo/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_weibo_note_comment(note_id, comment_item)
async def update_weibo_note_comment(note_id: str, comment_item: Dict):
comment_id = str(comment_item.get("id"))
user_info: Dict = comment_item.get("user")
local_db_item = {
"comment_id": comment_id,
"create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))),
"note_id": note_id,
"content": comment_item.get("text"),
"sub_comment_count": str(comment_item.get("total_number", 0)),
"comment_like_count": str(comment_item.get("like_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"ip_location": comment_item.get("source", "").replace("来自", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await WeiboComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await WeiboComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/weibo/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -1,150 +0,0 @@
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class XhsBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class XHSNote(XhsBaseModel):
note_id = fields.CharField(max_length=64, index=True, description="笔记ID")
type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)")
title = fields.CharField(null=True, max_length=255, description="笔记标题")
desc = fields.TextField(null=True, description="笔记描述")
time = fields.BigIntField(description="笔记发布时间戳", index=True)
last_update_time = fields.BigIntField(description="笔记最后更新时间戳")
liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数")
collected_count = fields.CharField(null=True, max_length=16, description="笔记收藏数")
comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数")
share_count = fields.CharField(null=True, max_length=16, description="笔记分享数")
image_list = fields.TextField(null=True, description="笔记封面图片列表")
note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL")
class Meta:
table = "xhs_note"
table_description = "小红书笔记"
def __str__(self):
return f"{self.note_id} - {self.title}"
class XHSNoteComment(XhsBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
create_time = fields.BigIntField(index=True, description="评论时间戳")
note_id = fields.CharField(max_length=64, description="笔记ID")
content = fields.TextField(description="评论内容")
sub_comment_count = fields.IntField(description="子评论数量")
class Meta:
table = "xhs_note_comment"
table_description = "小红书笔记评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"
async def update_xhs_note(note_item: Dict):
note_id = note_item.get("note_id")
user_info = note_item.get("user", {})
interact_info = note_item.get("interact_info", {})
image_list: List[Dict] = note_item.get("image_list", [])
local_db_item = {
"note_id": note_item.get("note_id"),
"type": note_item.get("type"),
"title": note_item.get("title") or note_item.get("desc", "")[:255],
"desc": note_item.get("desc", ""),
"time": note_item.get("time"),
"last_update_time": note_item.get("last_update_time", 0),
"user_id": user_info.get("user_id"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("avatar"),
"liked_count": interact_info.get("liked_count"),
"collected_count": interact_info.get("collected_count"),
"comment_count": interact_info.get("comment_count"),
"share_count": interact_info.get("share_count"),
"ip_location": note_item.get("ip_location", ""),
"image_list": ','.join([img.get('url', '') for img in image_list]),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}"
}
utils.logger.info(f"[models.xiaohongshu.update_xhs_note] xhs note: {local_db_item}")
if config.IS_SAVED_DATABASED:
if not await XHSNote.filter(note_id=note_id).first():
local_db_item["add_ts"] = utils.get_current_timestamp()
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',))
note_data = note_pydantic(**local_db_item)
note_pydantic.validate(note_data)
await XHSNote.create(**note_data.dict())
else:
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts'))
note_data = note_pydantic(**local_db_item)
note_pydantic.validate(note_data)
await XHSNote.filter(note_id=note_id).update(**note_data.dict())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/xhs/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def update_xhs_note_comment(note_id: str, comment_item: Dict):
user_info = comment_item.get("user_info", {})
comment_id = comment_item.get("id")
local_db_item = {
"comment_id": comment_id,
"create_time": comment_item.get("create_time"),
"ip_location": comment_item.get("ip_location"),
"note_id": note_id,
"content": comment_item.get("content"),
"user_id": user_info.get("user_id"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("image"),
"sub_comment_count": comment_item.get("sub_comment_count"),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[models.xiaohongshu.update_xhs_note_comment] xhs note comment:{local_db_item}")
if config.IS_SAVED_DATABASED:
if not await XHSNoteComment.filter(comment_id=comment_id).first():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await XHSNoteComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate",
exclude=('id', 'add_ts',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/xhs/{crawler_type_var.get()}_comment_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -11,3 +11,4 @@ aerich==0.7.2
numpy~=1.24.4
redis~=4.6.0
pydantic==2.5.2
aiofiles~=23.2.1

4
store/__init__.py Normal file
View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 17:29
# @Desc :

View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 19:34
# @Desc :
from typing import List
import config
from .bilibili_store_db_types import *
from .bilibili_store_impl import *
class BiliStoreFactory:
STORES = {
"csv": BiliCsvStoreImplement,
"db": BiliDbStoreImplement,
"json": BiliJsonStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = BiliStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
return store_class()
async def update_bilibili_video(video_item: Dict):
video_item_view: Dict = video_item.get("View")
video_user_info: Dict = video_item_view.get("owner")
video_item_stat: Dict = video_item_view.get("stat")
video_id = str(video_item_view.get("aid"))
save_content_item = {
"video_id": video_id,
"video_type": "video",
"title": video_item_view.get("title", "")[:500],
"desc": video_item_view.get("desc", "")[:500],
"create_time": video_item_view.get("pubdate"),
"user_id": str(video_user_info.get("mid")),
"nickname": video_user_info.get("name"),
"avatar": video_user_info.get("face", ""),
"liked_count": str(video_item_stat.get("like", "")),
"video_play_count": str(video_item_stat.get("view", "")),
"video_danmaku": str(video_item_stat.get("danmaku", "")),
"video_comment": str(video_item_stat.get("reply", "")),
"last_modify_ts": utils.get_current_timestamp(),
"video_url": f"https://www.bilibili.com/video/av{video_id}",
"video_cover_url": video_item_view.get("pic", ""),
}
utils.logger.info(
f"[store.bilibili.update_bilibili_video] bilibili video id:{video_id}, title:{save_content_item.get('title')}")
await BiliStoreFactory.create_store().store_content(content_item=save_content_item)
async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_bilibili_video_comment(video_id, comment_item)
async def update_bilibili_video_comment(video_id: str, comment_item: Dict):
comment_id = str(comment_item.get("rpid"))
content: Dict = comment_item.get("content")
user_info: Dict = comment_item.get("member")
save_comment_item = {
"comment_id": comment_id,
"create_time": comment_item.get("ctime"),
"video_id": str(video_id),
"content": content.get("message"),
"user_id": user_info.get("mid"),
"nickname": user_info.get("uname"),
"avatar": user_info.get("avatar"),
"sub_comment_count": str(comment_item.get("rcount", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(
f"[store.bilibili.update_bilibili_video_comment] Bilibili video comment: {comment_id}, content: {save_comment_item.get('content')}")
await BiliStoreFactory.create_store().store_comment(comment_item=save_comment_item)

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 19:34
# @Desc : B站存储到DB的模型类集合
from tortoise import fields
from tortoise.models import Model
class BilibiliBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class BilibiliVideo(BilibiliBaseModel):
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
video_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
video_play_count = fields.CharField(null=True, max_length=16, description="视频播放数量")
video_danmaku = fields.CharField(null=True, max_length=16, description="视频弹幕数量")
video_comment = fields.CharField(null=True, max_length=16, description="视频评论数量")
video_url = fields.CharField(null=True, max_length=512, description="视频详情URL")
video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL")
class Meta:
table = "bilibili_video"
table_description = "B站视频"
def __str__(self):
return f"{self.video_id} - {self.title}"
class BilibiliComment(BilibiliBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "bilibili_video_comment"
table_description = "B 站视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"

View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 19:34
# @Desc : B站存储实现类
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from tools import utils
from var import crawler_type_var
class BiliCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/bilibili"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/bilibili/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Bilibili content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Bilibili comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
class BiliDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Bilibili content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .bilibili_store_db_types import BilibiliVideo
video_id = content_item.get("video_id")
if not await BilibiliVideo.filter(video_id=video_id).exists():
content_item["add_ts"] = utils.get_current_timestamp()
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',))
bilibili_data = bilibili_video_pydantic(**content_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.create(**bilibili_data.model_dump())
else:
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate',
exclude=('id', 'add_ts'))
bilibili_data = bilibili_video_pydantic(**content_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump())
async def store_comment(self, comment_item: Dict):
"""
Bilibili content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .bilibili_store_db_types import BilibiliComment
comment_id = comment_item.get("comment_id")
if not await BilibiliComment.filter(comment_id=comment_id).exists():
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await BilibiliComment.create(**comment_data.model_dump())
else:
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
class BiliJsonStoreImplement(AbstractStore):
json_store_path: str = "data/bilibili"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json"
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
save_data = []
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementatio
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")

95
store/douyin/__init__.py Normal file
View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 18:46
# @Desc :
from typing import List
import config
from .douyin_store_db_types import *
from .douyin_store_impl import *
class DouyinStoreFactory:
STORES = {
"csv": DouyinCsvStoreImplement,
"db": DouyinDbStoreImplement,
"json": DouyinJsonStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = DouyinStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
return store_class()
async def update_douyin_aweme(aweme_item: Dict):
aweme_id = aweme_item.get("aweme_id")
user_info = aweme_item.get("author", {})
interact_info = aweme_item.get("statistics", {})
save_content_item = {
"aweme_id": aweme_id,
"aweme_type": str(aweme_item.get("aweme_type")),
"title": aweme_item.get("desc", ""),
"desc": aweme_item.get("desc", ""),
"create_time": aweme_item.get("create_time"),
"user_id": user_info.get("uid"),
"sec_uid": user_info.get("sec_uid"),
"short_user_id": user_info.get("short_id"),
"user_unique_id": user_info.get("unique_id"),
"user_signature": user_info.get("signature"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("avatar_thumb", {}).get("url_list", [""])[0],
"liked_count": str(interact_info.get("digg_count")),
"collected_count": str(interact_info.get("collect_count")),
"comment_count": str(interact_info.get("comment_count")),
"share_count": str(interact_info.get("share_count")),
"ip_location": aweme_item.get("ip_label", ""),
"last_modify_ts": utils.get_current_timestamp(),
"aweme_url": f"https://www.douyin.com/video/{aweme_id}"
}
utils.logger.info(
f"[store.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{save_content_item.get('title')}")
await DouyinStoreFactory.create_store().store_content(content_item=save_content_item)
async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_dy_aweme_comment(aweme_id, comment_item)
async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict):
comment_aweme_id = comment_item.get("aweme_id")
if aweme_id != comment_aweme_id:
utils.logger.error(
f"[store.douyin.update_dy_aweme_comment] comment_aweme_id: {comment_aweme_id} != aweme_id: {aweme_id}")
return
user_info = comment_item.get("user", {})
comment_id = comment_item.get("cid")
avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get(
"avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {}
save_comment_item = {
"comment_id": comment_id,
"create_time": comment_item.get("create_time"),
"ip_location": comment_item.get("ip_label", ""),
"aweme_id": aweme_id,
"content": comment_item.get("text"),
"user_id": user_info.get("uid"),
"sec_uid": user_info.get("sec_uid"),
"short_user_id": user_info.get("short_id"),
"user_unique_id": user_info.get("unique_id"),
"user_signature": user_info.get("signature"),
"nickname": user_info.get("nickname"),
"avatar": avatar_info.get("url_list", [""])[0],
"sub_comment_count": str(comment_item.get("reply_comment_total", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(
f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: {comment_id}, content: {save_comment_item.get('content')}")
await DouyinStoreFactory.create_store().store_comment(comment_item=save_comment_item)

View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 18:50
# @Desc : 抖音存储到DB的模型类集合
from tortoise import fields
from tortoise.models import Model
class DouyinBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
sec_uid = fields.CharField(null=True, max_length=128, description="用户sec_uid")
short_user_id = fields.CharField(null=True, max_length=64, description="用户短ID")
user_unique_id = fields.CharField(null=True, max_length=64, description="用户唯一ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
user_signature = fields.CharField(null=True, max_length=500, description="用户签名")
ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class DouyinAweme(DouyinBaseModel):
aweme_id = fields.CharField(max_length=64, index=True, description="视频ID")
aweme_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
comment_count = fields.CharField(null=True, max_length=16, description="视频评论数")
share_count = fields.CharField(null=True, max_length=16, description="视频分享数")
collected_count = fields.CharField(null=True, max_length=16, description="视频收藏数")
aweme_url = fields.CharField(null=True, max_length=255, description="视频详情页URL")
class Meta:
table = "douyin_aweme"
table_description = "抖音视频"
def __str__(self):
return f"{self.aweme_id} - {self.title}"
class DouyinAwemeComment(DouyinBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
aweme_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "douyin_aweme_comment"
table_description = "抖音视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"

View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 18:46
# @Desc : 抖音存储实现类
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from tools import utils
from var import crawler_type_var
class DouyinCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/douyin"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/douyin/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Xiaohongshu content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Xiaohongshu comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
class DouyinDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Douyin content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .douyin_store_db_types import DouyinAweme
aweme_id = content_item.get("aweme_id")
if not await DouyinAweme.filter(aweme_id=aweme_id).exists():
content_item["add_ts"] = utils.get_current_timestamp()
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',))
douyin_data = douyin_aweme_pydantic(**content_item)
douyin_aweme_pydantic.model_validate(douyin_data)
await DouyinAweme.create(**douyin_data.dict())
else:
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate',
exclude=('id', 'add_ts'))
douyin_data = douyin_aweme_pydantic(**content_item)
douyin_aweme_pydantic.model_validate(douyin_data)
await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.model_dump())
async def store_comment(self, comment_item: Dict):
"""
Douyin content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .douyin_store_db_types import DouyinAwemeComment
comment_id = comment_item.get("comment_id")
if not await DouyinAwemeComment.filter(comment_id=comment_id).exists():
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await DouyinAwemeComment.create(**comment_data.model_dump())
else:
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await DouyinAwemeComment.filter(comment_id=comment_item).update(**comment_data.model_dump())
class DouyinJsonStoreImplement(AbstractStore):
json_store_path: str = "data/douyin"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json"
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
save_data = []
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementatio
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 20:03
# @Desc :
from typing import List
import config
from .kuaishou_store_db_types import *
from .kuaishou_store_impl import *
class KuaishouStoreFactory:
STORES = {
"csv": KuaishouCsvStoreImplement,
"db": KuaishouDbStoreImplement,
"json": KuaishouJsonStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = KuaishouStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
return store_class()
async def update_kuaishou_video(video_item: Dict):
photo_info: Dict = video_item.get("photo", {})
video_id = photo_info.get("id")
if not video_id:
return
user_info = video_item.get("author", {})
save_content_item = {
"video_id": video_id,
"video_type": str(video_item.get("type")),
"title": photo_info.get("caption", "")[:500],
"desc": photo_info.get("caption", "")[:500],
"create_time": photo_info.get("timestamp"),
"user_id": user_info.get("id"),
"nickname": user_info.get("name"),
"avatar": user_info.get("headerUrl", ""),
"liked_count": str(photo_info.get("realLikeCount")),
"viewd_count": str(photo_info.get("viewCount")),
"last_modify_ts": utils.get_current_timestamp(),
"video_url": f"https://www.kuaishou.com/short-video/{video_id}",
"video_cover_url": photo_info.get("coverUrl", ""),
"video_play_url": photo_info.get("photoUrl", ""),
}
utils.logger.info(f"[store.kuaishou.update_kuaishou_video] Kuaishou video id:{video_id}, title:{save_content_item.get('title')}")
await KuaishouStoreFactory.create_store().store_content(content_item=save_content_item)
async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]):
utils.logger.info(f"[store.kuaishou.batch_update_ks_video_comments] video_id:{video_id}, comments:{comments}")
if not comments:
return
for comment_item in comments:
await update_ks_video_comment(video_id, comment_item)
async def update_ks_video_comment(video_id: str, comment_item: Dict):
comment_id = comment_item.get("commentId")
save_comment_item = {
"comment_id": comment_id,
"create_time": comment_item.get("timestamp"),
"video_id": video_id,
"content": comment_item.get("content"),
"user_id": comment_item.get("authorId"),
"nickname": comment_item.get("authorName"),
"avatar": comment_item.get("headurl"),
"sub_comment_count": str(comment_item.get("subCommentCount", 0)),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[store.kuaishou.update_ks_video_comment] Kuaishou video comment: {comment_id}, content: {save_comment_item.get('content')}")
await KuaishouStoreFactory.create_store().store_comment(comment_item=save_comment_item)

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 20:03
# @Desc : 快手存储到DB的模型类集合
from tortoise import fields
from tortoise.models import Model
class KuaishouBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class KuaishouVideo(KuaishouBaseModel):
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
video_type = fields.CharField(max_length=16, description="视频类型")
title = fields.CharField(null=True, max_length=500, description="视频标题")
desc = fields.TextField(null=True, description="视频描述")
create_time = fields.BigIntField(description="视频发布时间戳", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数")
viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量")
video_url = fields.CharField(null=True, max_length=512, description="视频详情URL")
video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL")
video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL")
class Meta:
table = "kuaishou_video"
table_description = "快手视频"
def __str__(self):
return f"{self.video_id} - {self.title}"
class KuaishouVideoComment(KuaishouBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
video_id = fields.CharField(max_length=64, index=True, description="视频ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "kuaishou_video_comment"
table_description = "快手视频评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"

View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 20:03
# @Desc : 快手存储实现类
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from tools import utils
from var import crawler_type_var
class KuaishouCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/kuaishou"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/kuaishou/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Kuaishou content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Kuaishou comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
class KuaishouDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Kuaishou content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .kuaishou_store_db_types import KuaishouVideo
video_id = content_item.get("video_id")
if not await KuaishouVideo.filter(video_id=video_id).exists():
content_item["add_ts"] = utils.get_current_timestamp()
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',))
kuaishou_data = kuaishou_video_pydantic(**content_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.create(**kuaishou_data.model_dump())
else:
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate',
exclude=('id', 'add_ts'))
kuaishou_data = kuaishou_video_pydantic(**content_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump())
async def store_comment(self, comment_item: Dict):
"""
Kuaishou content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .kuaishou_store_db_types import KuaishouVideoComment
comment_id = comment_item.get("comment_id")
if not await KuaishouVideoComment.filter(comment_id=comment_id).exists():
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await KuaishouVideoComment.create(**comment_data.model_dump())
else:
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
class KuaishouJsonStoreImplement(AbstractStore):
json_store_path: str = "data/kuaishou"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json"
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
save_data = []
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementatio
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")

88
store/weibo/__init__.py Normal file
View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 21:34
# @Desc :
from typing import List
import config
from .weibo_store_db_types import *
from .weibo_store_impl import *
class WeibostoreFactory:
STORES = {
"csv": WeiboCsvStoreImplement,
"db": WeiboDbStoreImplement,
"json": WeiboJsonStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json ...")
return store_class()
async def update_weibo_note(note_item: Dict):
mblog: Dict = note_item.get("mblog")
user_info: Dict = mblog.get("user")
note_id = mblog.get("id")
save_content_item = {
# 微博信息
"note_id": note_id,
"content": mblog.get("text"),
"create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(mblog.get("created_at"))),
"liked_count": str(mblog.get("attitudes_count", 0)),
"comments_count": str(mblog.get("comments_count", 0)),
"shared_count": str(mblog.get("reposts_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://m.weibo.cn/detail/{note_id}",
"ip_location": mblog.get("region_name", "").replace("发布于 ", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[store.weibo.update_weibo_note] weibo note id:{note_id}, title:{save_content_item.get('content')[:24]} ...")
await WeibostoreFactory.create_store().store_content(content_item=save_content_item)
async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_weibo_note_comment(note_id, comment_item)
async def update_weibo_note_comment(note_id: str, comment_item: Dict):
comment_id = str(comment_item.get("id"))
user_info: Dict = comment_item.get("user")
save_comment_item = {
"comment_id": comment_id,
"create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))),
"note_id": note_id,
"content": comment_item.get("text"),
"sub_comment_count": str(comment_item.get("total_number", 0)),
"comment_like_count": str(comment_item.get("like_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"ip_location": comment_item.get("source", "").replace("来自", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[store.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {save_comment_item.get('content','')[:24]} ...")
await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item)

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 21:35
# @Desc : 微博存储到DB的模型类集合
from tortoise import fields
from tortoise.models import Model
class WeiboBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
gender = fields.CharField(null=True, max_length=12, description="用户性别")
profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址")
ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class WeiboNote(WeiboBaseModel):
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="帖子正文内容")
create_time = fields.BigIntField(description="帖子发布时间戳", index=True)
create_date_time = fields.CharField(description="帖子发布日期时间", max_length=32, index=True)
liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数")
comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量")
shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量")
note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL")
class Meta:
table = "weibo_note"
table_description = "微博帖子"
def __str__(self):
return f"{self.note_id}"
class WeiboComment(WeiboBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
create_date_time = fields.CharField(description="评论日期时间", max_length=32, index=True)
comment_like_count = fields.CharField(max_length=16, description="评论点赞数量")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "weibo_note_comment"
table_description = "微博帖子评论"
def __str__(self):
return f"{self.comment_id}"

View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 21:35
# @Desc : 微博存储实现类
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from tools import utils
from var import crawler_type_var
class WeiboCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/weibo"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/bilibili/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Weibo content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Weibo comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
class WeiboDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Weibo content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .weibo_store_db_types import WeiboNote
note_id = content_item.get("note_id")
if not await WeiboNote.filter(note_id=note_id).exists():
content_item["add_ts"] = utils.get_current_timestamp()
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_note_pydantic(**content_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.create(**weibo_data.model_dump())
else:
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
exclude=('id', 'add_ts'))
weibo_data = weibo_note_pydantic(**content_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
async def store_comment(self, comment_item: Dict):
"""
Weibo content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .weibo_store_db_types import WeiboComment
comment_id = comment_item.get("comment_id")
if not await WeiboComment.filter(comment_id=comment_id).exists():
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await WeiboComment.create(**comment_data.model_dump())
else:
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await WeiboComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
class WeiboJsonStoreImplement(AbstractStore):
json_store_path: str = "data/weibo"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json"
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
save_data = []
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementatio
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")

74
store/xhs/__init__.py Normal file
View File

@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 17:34
# @Desc :
from typing import List
import config
from . import xhs_store_impl
from .xhs_store_db_types import *
from .xhs_store_impl import *
class XhsStoreFactory:
STORES = {
"csv": XhsCsvStoreImplement,
"db": XhsDbStoreImplement,
"json": XhsJsonStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
return store_class()
async def update_xhs_note(note_item: Dict):
note_id = note_item.get("note_id")
user_info = note_item.get("user", {})
interact_info = note_item.get("interact_info", {})
image_list: List[Dict] = note_item.get("image_list", [])
local_db_item = {
"note_id": note_item.get("note_id"),
"type": note_item.get("type"),
"title": note_item.get("title") or note_item.get("desc", "")[:255],
"desc": note_item.get("desc", ""),
"time": note_item.get("time"),
"last_update_time": note_item.get("last_update_time", 0),
"user_id": user_info.get("user_id"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("avatar"),
"liked_count": interact_info.get("liked_count"),
"collected_count": interact_info.get("collected_count"),
"comment_count": interact_info.get("comment_count"),
"share_count": interact_info.get("share_count"),
"ip_location": note_item.get("ip_location", ""),
"image_list": ','.join([img.get('url', '') for img in image_list]),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}"
}
utils.logger.info(f"[store.xhs.update_xhs_note] xhs note: {local_db_item}")
await XhsStoreFactory.create_store().store_content(local_db_item)
async def update_xhs_note_comment(note_id: str, comment_item: Dict):
user_info = comment_item.get("user_info", {})
comment_id = comment_item.get("id")
local_db_item = {
"comment_id": comment_id,
"create_time": comment_item.get("create_time"),
"ip_location": comment_item.get("ip_location"),
"note_id": note_id,
"content": comment_item.get("content"),
"user_id": user_info.get("user_id"),
"nickname": user_info.get("nickname"),
"avatar": user_info.get("image"),
"sub_comment_count": comment_item.get("sub_comment_count"),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}")
await XhsStoreFactory.create_store().store_comment(local_db_item)

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 17:31
# @Desc : 小红书存储到DB的模型类集合
from tortoise import fields
from tortoise.models import Model
class XhsBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class XHSNote(XhsBaseModel):
note_id = fields.CharField(max_length=64, index=True, description="笔记ID")
type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)")
title = fields.CharField(null=True, max_length=255, description="笔记标题")
desc = fields.TextField(null=True, description="笔记描述")
time = fields.BigIntField(description="笔记发布时间戳", index=True)
last_update_time = fields.BigIntField(description="笔记最后更新时间戳")
liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数")
collected_count = fields.CharField(null=True, max_length=16, description="笔记收藏数")
comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数")
share_count = fields.CharField(null=True, max_length=16, description="笔记分享数")
image_list = fields.TextField(null=True, description="笔记封面图片列表")
note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL")
class Meta:
table = "xhs_note"
table_description = "小红书笔记"
def __str__(self):
return f"{self.note_id} - {self.title}"
class XHSNoteComment(XhsBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
create_time = fields.BigIntField(index=True, description="评论时间戳")
note_id = fields.CharField(max_length=64, description="笔记ID")
content = fields.TextField(description="评论内容")
sub_comment_count = fields.IntField(description="子评论数量")
class Meta:
table = "xhs_note_comment"
table_description = "小红书笔记评论"
def __str__(self):
return f"{self.comment_id} - {self.content}"

180
store/xhs/xhs_store_impl.py Normal file
View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 16:58
# @Desc : 小红书存储实现类
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from tools import utils
from var import crawler_type_var
class XhsCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/xhs"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/xhs/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
f.fileno()
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Xiaohongshu content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Xiaohongshu comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
class XhsDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Xiaohongshu content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .xhs_store_db_types import XHSNote
note_id = content_item.get("note_id")
if not await XHSNote.filter(note_id=note_id).first():
content_item["add_ts"] = utils.get_current_timestamp()
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',))
note_data = note_pydantic(**content_item)
note_pydantic.model_validate(note_data)
await XHSNote.create(**note_data.model_dump())
else:
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts'))
note_data = note_pydantic(**content_item)
note_pydantic.model_validate(note_data)
await XHSNote.filter(note_id=note_id).update(**note_data.model_dump())
async def store_comment(self, comment_item: Dict):
"""
Xiaohongshu content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .xhs_store_db_types import XHSNoteComment
comment_id = comment_item.get("id")
if not await XHSNoteComment.filter(comment_id=comment_id).first():
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await XHSNoteComment.create(**comment_data.model_dump())
else:
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate",
exclude=('id', 'add_ts',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
class XhsJsonStoreImplement(AbstractStore):
json_store_path: str = "data/xhs"
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json"
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
save_data = []
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementatio
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")