Compare commits

...

10 Commits

Author SHA1 Message Date
f74ad7926e first commit
Some checks failed
Deploy VitePress site to Pages / build (push) Has been cancelled
Deploy VitePress site to Pages / Deploy (push) Has been cancelled
2024-12-17 16:14:10 +08:00
程序员阿江(Relakkes)
9c7e1d499b
Merge pull request #509 from leantli/feat/xhs_comments_upgrade
feat: xhs comments add xsec_token
2024-12-03 18:34:56 +08:00
leantli
e830ada574 feat: xhs comments add xsec_token 2024-12-03 18:25:21 +08:00
程序员阿江(Relakkes)
6001fc8a52
Merge pull request #506 from JianxunRao/main
fix:微博根据creator爬取note时,爬取评论失败。原因是解析的参数key有误
2024-11-29 11:07:58 +08:00
Trojx
f9eedc59b1 fix:微博根据creator爬取note时,爬取评论失败。原因是解析的参数key有误 2024-11-29 10:47:40 +08:00
Relakkes
453ea642fb docs: update README.md 2024-11-29 10:40:52 +08:00
Relakkes
ca9b47ef63 fix: xhs 帖子详情优化 2024-11-27 09:41:24 +08:00
Relakkes
43dffeb2d1 feat: xhs帖子详情获取优化 2024-11-26 13:37:53 +08:00
Relakkes
de32d06815 docs: update README.md 2024-11-19 12:58:28 +08:00
Relakkes
935a928f90 docs: update README.md 2024-11-17 07:00:55 +08:00
28 changed files with 2780 additions and 423 deletions

297
README.md
View File

@ -1,204 +1,141 @@
> **免责声明:**
>
> 大家请以学习为目的使用本仓库⚠️⚠️⚠️⚠️,[爬虫违法违规的案件](https://github.com/HiddenStrawberry/Crawler_Illegal_Cases_In_China) <br>
>
>本仓库的所有内容仅供学习和参考之用,禁止用于商业用途。任何人或组织不得将本仓库的内容用于非法用途或侵犯他人合法权益。本仓库所涉及的爬虫技术仅用于学习和研究,不得用于对其他平台进行大规模爬虫或其他非法行为。对于因使用本仓库内容而引起的任何法律责任,本仓库不承担任何责任。使用本仓库的内容即表示您同意本免责声明的所有条款和条件。
>
> 点击查看更为详细的免责声明。[点击跳转](#disclaimer)
# 小红书内容智能分析系统
# 仓库描述
## 项目概述
本项目旨在构建一个自动化的小红书内容采集和智能分析系统。通过爬虫采集、多模态内容处理和AI分析将小红书的图文视频内容转化为结构化的知识输出。
**小红书爬虫****抖音爬虫** **快手爬虫** **B站爬虫** **微博爬虫****百度贴吧爬虫****知乎爬虫**...。
目前能抓取小红书、抖音、快手、B站、微博、贴吧、知乎等平台的公开信息。
## 系统架构
原理:利用[playwright](https://playwright.dev/)搭桥保留登录成功后的上下文浏览器环境通过执行JS表达式获取一些加密参数
通过使用此方式免去了复现核心加密JS代码逆向难度大大降低
### 1. 数据采集层
#### 1.1 内容爬取
- 使用 MediaCrawler 爬虫框架
- 根据指定关键词抓取小红书笔记
- 将原始数据保存为 JSON 格式
- 包含笔记文本、图片URL、视频URL等信息
# 功能列表
| 平台 | 关键词搜索 | 指定帖子ID爬取 | 二级评论 | 指定创作者主页 | 登录态缓存 | IP代理池 | 生成评论词云图 |
|-----|-------|---------|-----|--------|-------|-------|-------|
| 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 快手 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| B 站 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 微博 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 贴吧 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 知乎 | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
#### 1.2 数据存储
- 将 JSON 数据导入 MySQL 数据库
- 建立规范的数据表结构
- 实现数据的持久化存储和管理
#### 1.3 媒体文件下载
- 从数据库读取媒体文件URL
- 下载笔记关联的图片和视频
- 按笔记ID分类存储在本地文件系统
### 2. 内容处理层
#### 2.1 视频处理
- 使用 Faster-Whisper 模型
- 将视频音频转换为文字
- 支持中文语音识别
- 保存字幕文本
#### 2.2 图像处理
- 使用 ChatGPT-4-Vision 模型
- 分析图片内容
- 提取图片中的关键信息
- 生成图片描述文本
### 3. 智能分析层
#### 3.1 内容理解
- 使用 ChatGPT 处理文本内容
- 整合视频字幕和图片描述
- 生成内容摘要
- 提取关键信息点
#### 3.2 知识图谱
- 基于内容分析生成思维导图
- 展示主题间的逻辑关系
- 可视化知识结构
## 技术栈
- 爬虫框架MediaCrawler
- 数据库MySQL
- 音频处理Faster-Whisper
- 图像识别ChatGPT-4-Vision
- 自然语言处理ChatGPT
- 编程语言Python
## 工作流程图
![工作流程图](docs/static/images/fig1.png)
## 预期成果
1. 自动化的内容采集系统
2. 结构化的多模态数据存储
3. 智能化的内容理解和分析
4. 可视化的知识展示
### 插播一下MediaCrawlerPro重磅发布啦
> 主打学习成熟项目的架构设计不仅仅是爬虫Pro中的其他代码设计思路也是值得学习欢迎大家关注
>
> 订阅Pro源代码访问权限可以加我微信yzglan备注Pro有一定的门槛💰
## 应用场景
- 内容创作参考
- 市场趋势分析
- 用户行为研究
- 知识管理系统
[MediaCrawlerPro](https://github.com/MediaCrawlerPro) 版本已经重构出来了,相较于开源版本的优势:
- 多账号+IP代理支持重点
- 去除Playwright依赖使用更加简单
- 支持linux部署Docker docker-compose
- 代码重构优化更加易读易维护解耦JS签名逻辑
- 代码质量更高,对于构建更大型的爬虫项目更加友好
- 完美的架构设计,更加易扩展,源码学习的价值更大
## 后续优化方向
1. 提高爬虫效率和稳定性
2. 优化媒体文件存储结构
3. 提升AI模型处理精度
4. 增强可视化展示效果
5. 添加用户交互界面
# 安装部署方法
> 开源不易希望大家可以Star一下MediaCrawler仓库十分感谢 <br>
## 风险分析
## 创建并激活 python 虚拟环境
> 如果是爬取抖音和知乎需要提前安装nodejs环境版本大于等于`16`即可 <br>
```shell
# 进入项目根目录
cd MediaCrawler
# 创建虚拟环境
# 我的python版本是3.9.6requirements.txt中的库是基于这个版本的如果是其他python版本可能requirements.txt中的库不兼容自行解决一下。
python -m venv venv
# macos & linux 激活虚拟环境
source venv/bin/activate
### 1. 法律合规风险
# windows 激活虚拟环境
venv\Scripts\activate
#### 1.1 违反网络安全法风险
- 根据《中华人民共和国网络安全法》第四十四条规定,任何个人和组织不得窃取或者以其他非法方式获取个人信息
- 在爬取过程中必须避免收集用户个人隐私信息
- 确保数据采集和使用符合相关法律法规
```
#### 1.2 侵犯知识产权风险
- 需注意平台内容的版权问题
- 避免大规模复制和传播他人原创内容
- 不得将爬取的内容用于商业牟利
## 安装依赖库
#### 1.3 违反平台服务条款风险
- 违反平台规则可能面临账号封禁
- 过度爬取可能导致IP封锁
- 严重违规可能引发平台法律诉讼
```shell
pip install -r requirements.txt
```
### 2. 技术风险
## 安装 playwright浏览器驱动
#### 2.1 反爬虫机制
- 平台可能部署各种反爬虫措施
- IP被封禁影响采集效率
- 需要不断更新技术方案应对
```shell
playwright install
```
#### 2.2 数据质量风险
- 采集数据可能不完整或有误
- 多媒体内容下载失败
- 数据格式变化导致解析错误
## 运行爬虫程序
### 3. 使用建议
```shell
### 项目默认是没有开启评论爬取模式如需评论请在config/base_config.py中的 ENABLE_GET_COMMENTS 变量修改
### 一些其他支持项也可以在config/base_config.py查看功能写的有中文注释
# 从配置文件中读取关键词搜索相关的帖子并爬取帖子信息与评论
python main.py --platform xhs --lt qrcode --type search
# 从配置文件中读取指定的帖子ID列表获取指定帖子的信息与评论信息
python main.py --platform xhs --lt qrcode --type detail
# 打开对应APP扫二维码登录
# 其他平台爬虫使用示例,执行下面的命令查看
python main.py --help
```
#### 3.1 合规使用
- 仅采集公开可见的内容
- 避免采集用户个人信息
- 采集频率保持合理范围
- 遵守平台的robots.txt规则
## 数据保存
- 支持关系型数据库Mysql中保存需要提前创建数据库
- 执行 `python db.py` 初始化数据库数据库表结构(只在首次执行)
- 支持保存到csv中data/目录下)
- 支持保存到json中data/目录下)
#### 3.2 技术防范
- 使用代理IP分散请求
- 控制请求频率和并发数
- 做好异常处理和日志记录
- 定期备份重要数据
### 4. 案例警示
根据[GitHub上的中国爬虫违法违规案例汇总](https://github.com/HiddenStrawberry/Crawler_Illegal_Cases_In_China),以下行为可能带来严重法律后果:
# 其他常见问题可以查看在线文档
>
> 在线文档包含使用方法、常见问题、加入项目交流群等。
> [MediaCrawler在线文档](https://nanmicoder.github.io/MediaCrawler/)
>
- 爬取和贩卖个人隐私数据
- 破解验证码并提供服务
- 未经授权爬取并复制商业数据
- 大规模爬取导致目标网站服务中断
# 开发者服务
> 如果你对知识付费认可,可以看下下面我提供的付费服务,如果你是学生,请一定提前告知,会有优惠💰<br>
- MediaCrawler源码剖析课程
如果你想很快入门这个项目,或者想了具体实现原理,我推荐你看看这个我录制的视频课程,从设计出发一步步带你如何使用,门槛大大降低
- **抖音课程链接**仅支持安卓https://v.douyin.com/iYeQFyAf/
- **B站课程链接**https://www.bilibili.com/cheese/play/ss16569
(备注课程介绍飞书文档链接https://relakkes.feishu.cn/wiki/JUgBwdhIeiSbAwkFCLkciHdAnhh
<br>
<br>
- 阿江的爬虫专栏知识星球MediaCrawler相关问题最佳实践、爬虫逆向分享、爬虫项目实战、多年编程经验分享、爬虫编程技术问题提问。
<p>
<img alt="xingqiu" src="docs/static/images/星球qrcode.jpg" style="width: auto;height: 400px" >
</p>
星球精选文章(部分)
- [逆向案例 - 某16x8平台商品列表接口逆向参数分析](https://articles.zsxq.com/id_x1qmtg8pzld9.html)
- [逆向案例 - Product Hunt月度最佳产品榜单接口加密参数分析](https://articles.zsxq.com/id_au4eich3x2sg.html)
- [逆向案例 - 某zhi乎x-zse-96参数分析过程](https://articles.zsxq.com/id_dui2vil0ag1l.html)
- [逆向案例 - 某x识星球X-Signature加密参数分析过程](https://articles.zsxq.com/id_pp4madwcwcg8.html)
- [【独创】使用Playwright获取某音a_bogus参数流程包含加密参数分析](https://articles.zsxq.com/id_u89al50jk9x0.html)
- [【独创】使用Playwright低成本获取某书X-s参数流程分析当年的回忆录](https://articles.zsxq.com/id_u4lcrvqakuc7.html)
- [ MediaCrawler-基于抽象类设计重构项目缓存](https://articles.zsxq.com/id_4ju73oxewt9j.html)
- [ 手把手带你撸一个自己的IP代理池](https://articles.zsxq.com/id_38fza371ladm.html)
- [Python协程在并发场景下的幂等性问题](https://articles.zsxq.com/id_wocdwsfmfcmp.html)
- [错误使用 Python 可变类型带来的隐藏 Bug](https://articles.zsxq.com/id_f7vn89l1d303.html)
# 感谢下列Sponsors对本仓库赞助支持
- <a href="https://sider.ai/ad-land-redirect?source=github&p1=mi&p2=kk">【Sider】全网最火的ChatGPT插件我也免费薅羊毛用了快一年了体验拉满。</a>
成为赞助者可以将您产品展示在这里每天获得大量曝光联系作者微信yzglan 或 emailrelakkes@gmail.com
# MediaCrawler项目微信交流群
[加入微信交流群](https://nanmicoder.github.io/MediaCrawler/%E5%BE%AE%E4%BF%A1%E4%BA%A4%E6%B5%81%E7%BE%A4.html)
# 打赏
如果觉得项目不错的话可以打赏哦。您的支持就是我最大的动力!
打赏时您可以备注名称,我会将您添加至打赏列表中。
<p>
<img alt="打赏-微信" src="docs/static/images/wechat_pay.jpeg" style="width: 200px;margin-right: 140px;" />
<img alt="打赏-支付宝" src="docs/static/images/zfb_pay.png" style="width: 200px" />
</p>
查看打赏列表 [MediaCrawler捐赠名单](https://nanmicoder.github.io/MediaCrawler/捐赠名单.html)
# 爬虫入门课程
我新开的爬虫教程Github仓库 [CrawlerTutorial](https://github.com/NanmiCoder/CrawlerTutorial) ,感兴趣的朋友可以关注一下,持续更新,主打一个免费.
# star 趋势图
- 如果该项目对你有帮助,帮忙 star一下 ❤让更多的人看到MediaCrawler这个项目
[![Star History Chart](https://api.star-history.com/svg?repos=NanmiCoder/MediaCrawler&type=Date)](https://star-history.com/#NanmiCoder/MediaCrawler&Date)
# 参考
- xhs客户端 [ReaJason的xhs仓库](https://github.com/ReaJason/xhs)
- 短信转发 [参考仓库](https://github.com/pppscn/SmsForwarder)
- 内网穿透工具 [ngrok](https://ngrok.com/docs/)
# 免责声明
<div id="disclaimer">
## 1. 项目目的与性质
本项目(以下简称“本项目”)是作为一个技术研究与学习工具而创建的,旨在探索和学习网络数据采集技术。本项目专注于自媒体平台的数据爬取技术研究,旨在提供给学习者和研究者作为技术交流之用。
## 2. 法律合规性声明
本项目开发者(以下简称“开发者”)郑重提醒用户在下载、安装和使用本项目时,严格遵守中华人民共和国相关法律法规,包括但不限于《中华人民共和国网络安全法》、《中华人民共和国反间谍法》等所有适用的国家法律和政策。用户应自行承担一切因使用本项目而可能引起的法律责任。
## 3. 使用目的限制
本项目严禁用于任何非法目的或非学习、非研究的商业行为。本项目不得用于任何形式的非法侵入他人计算机系统,不得用于任何侵犯他人知识产权或其他合法权益的行为。用户应保证其使用本项目的目的纯属个人学习和技术研究,不得用于任何形式的非法活动。
## 4. 免责声明
开发者已尽最大努力确保本项目的正当性及安全性,但不对用户使用本项目可能引起的任何形式的直接或间接损失承担责任。包括但不限于由于使用本项目而导致的任何数据丢失、设备损坏、法律诉讼等。
## 5. 知识产权声明
本项目的知识产权归开发者所有。本项目受到著作权法和国际著作权条约以及其他知识产权法律和条约的保护。用户在遵守本声明及相关法律法规的前提下,可以下载和使用本项目。
## 6. 最终解释权
关于本项目的最终解释权归开发者所有。开发者保留随时更改或更新本免责声明的权利,恕不另行通知。
</div>
## 感谢JetBrains提供的免费开源许可证支持
<a href="https://www.jetbrains.com/?from=MediaCrawler">
<img src="https://www.jetbrains.com/company/brand/img/jetbrains_logo.png" width="100" alt="JetBrains" />
</a>
### 5. 合规建议
1. 项目启动前进行法律可行性评估
2. 建立数据安全管理制度
3. 保留完整的操作日志记录
4. 定期进行合规性自查
5. 如有必要可咨询法律专家

155
README_v1.md Normal file
View File

@ -0,0 +1,155 @@
# 🔥 自媒体平台爬虫🕷MediaCrawler🔥
z
# 仓库描述
**小红书爬虫****抖音爬虫** **快手爬虫** **B站爬虫** **微博爬虫****百度贴吧爬虫****知乎爬虫**...。
目前能抓取小红书、抖音、快手、B站、微博、贴吧、知乎等平台的公开信息。
原理:利用[playwright](https://playwright.dev/)搭桥保留登录成功后的上下文浏览器环境通过执行JS表达式获取一些加密参数
通过使用此方式免去了复现核心加密JS代码逆向难度大大降低
# 功能列表
| 平台 | 关键词搜索 | 指定帖子ID爬取 | 二级评论 | 指定创作者主页 | 登录态缓存 | IP代理池 | 生成评论词云图 |
| ------ | ---------- | -------------- | -------- | -------------- | ---------- | -------- | -------------- |
| 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 快手 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| B 站 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 微博 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 贴吧 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 知乎 | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
### MediaCrawlerPro重磅发布啦
> 主打学习成熟项目的架构设计不仅仅是爬虫Pro中的其他代码设计思路也是值得学习欢迎大家关注
[MediaCrawlerPro](https://github.com/MediaCrawlerPro) 版本已经重构出来了,相较于开源版本的优势:
- 多账号+IP代理支持重点
- 去除Playwright依赖使用更加简单
- 支持linux部署Docker docker-compose
- 代码重构优化更加易读易维护解耦JS签名逻辑
- 代码质量更高,对于构建更大型的爬虫项目更加友好
- 完美的架构设计,更加易扩展,源码学习的价值更大
# 安装部署方法
> 开源不易希望大家可以Star一下MediaCrawler仓库十分感谢 <br>
## 创建并激活 python 虚拟环境
> 如果是爬取抖音和知乎需要提前安装nodejs环境版本大于等于`16`即可 <br>
```shell
# 进入项目根目录
cd MediaCrawler
# 创建虚拟环境
# 我的python版本是3.9.6requirements.txt中的库是基于这个版本的如果是其他python版本可能requirements.txt中的库不兼容自行解决一下。
python -m venv venv
# macos & linux 激活虚拟环境
source venv/bin/activate
# windows 激活虚拟环境
venv\Scripts\activate
```
## 安装依赖库
```shell
pip install -r requirements.txt
```
## 安装 playwright浏览器驱动
```shell
playwright install
```
## 运行爬虫程序
```shell
### 项目默认是没有开启评论爬取模式如需评论请在config/base_config.py中的 ENABLE_GET_COMMENTS 变量修改
### 一些其他支持项也可以在config/base_config.py查看功能写的有中文注释
# 从配置文件中读取关键词搜索相关的帖子并爬取帖子信息与评论
python main.py --platform xhs --lt qrcode --type search
# 从配置文件中读取指定的帖子ID列表获取指定帖子的信息与评论信息
python main.py --platform xhs --lt qrcode --type detail
# 打开对应APP扫二维码登录
# 其他平台爬虫使用示例,执行下面的命令查看
python main.py --help
```
## 数据保存
- 支持关系型数据库Mysql中保存需要提前创建数据库
- 执行 `python db.py` 初始化数据库数据库表结构(只在首次执行)
- 支持保存到csv中data/目录下)
- 支持保存到json中data/目录下)
# 其他常见问题可以查看在线文档
>
> 在线文档包含使用方法、常见问题、加入项目交流群等。
> [MediaCrawler在线文档](https://nanmicoder.github.io/MediaCrawler/)
>
# 知识付费服务
[作者的知识付费栏目介绍](https://nanmicoder.github.io/MediaCrawler/%E7%9F%A5%E8%AF%86%E4%BB%98%E8%B4%B9%E4%BB%8B%E7%BB%8D.html)
# 项目微信交流群
[加入微信交流群](https://nanmicoder.github.io/MediaCrawler/%E5%BE%AE%E4%BF%A1%E4%BA%A4%E6%B5%81%E7%BE%A4.html)
# 感谢下列Sponsors对本仓库赞助支持
- <a href="https://sider.ai/ad-land-redirect?source=github&p1=mi&p2=kk">【Sider】全网最火的ChatGPT插件我也免费薅羊毛用了快一年了体验拉满。</a>
成为赞助者可以将您产品展示在这里每天获得大量曝光联系作者微信yzglan 或 emailrelakkes@gmail.com
# 爬虫入门课程
我新开的爬虫教程Github仓库 [CrawlerTutorial](https://github.com/NanmiCoder/CrawlerTutorial) ,感兴趣的朋友可以关注一下,持续更新,主打一个免费.
# star 趋势图
- 如果该项目对你有帮助,帮忙 star一下 ❤让更多的人看到MediaCrawler这个项目
[![Star History Chart](https://api.star-history.com/svg?repos=NanmiCoder/MediaCrawler&type=Date)](https://star-history.com/#NanmiCoder/MediaCrawler&Date)
# 参考
- xhs客户端 [ReaJason的xhs仓库](https://github.com/ReaJason/xhs)
- 短信转发 [参考仓库](https://github.com/pppscn/SmsForwarder)
- 内网穿透工具 [ngrok](https://ngrok.com/docs/)
# 免责声明
<div id="disclaimer">
## 1. 项目目的与性质
本项目(以下简称“本项目”)是作为一个技术研究与学习工具而创建的,旨在探索和学习网络数据采集技术。本项目专注于自媒体平台的数据爬取技术研究,旨在提供给学习者和研究者作为技术交流之用。
## 2. 法律合规性声明
本项目开发者(以下简称“开发者”)郑重提醒用户在下载、安装和使用本项目时,严格遵守中华人民共和国相关法律法规,包括但不限于《中华人民共和国网络安全法》、《中华人民共和国反间谍法》等所有适用的国家法律和政策。用户应自行承担一切因使用本项目而可能引起的法律责任。
## 3. 使用目的限制
本项目严禁用于任何非法目的或非学习、非研究的商业行为。本项目不得用于任何形式的非法侵入他人计算机系统,不得用于任何侵犯他人知识产权或其他合法权益的行为。用户应保证其使用本项目的目的纯属个人学习和技术研究,不得用于任何形式的非法活动。
## 4. 免责声明
开发者已尽最大努力确保本项目的正当性及安全性,但不对用户使用本项目可能引起的任何形式的直接或间接损失承担责任。包括但不限于由于使用本项目而导致的任何数据丢失、设备损坏、法律诉讼等。
## 5. 知识产权声明
本项目的知识产权归开发者所有。本项目受到著作权法和国际著作权条约以及其他知识产权法律和条约的保护。用户在遵守本声明及相关法律法规的前提下,可以下载和使用本项目。
## 6. 最终解释权
关于本项目的最终解释权归开发者所有。开发者保留随时更改或更新本免责声明的权利,恕不另行通知。
</div>
## 感谢JetBrains提供的免费开源许可证支持
<a href="https://www.jetbrains.com/?from=MediaCrawler">
<img src="https://www.jetbrains.com/company/brand/img/jetbrains_logo.png" width="100" alt="JetBrains" />
</a>

52
build.py Normal file
View File

@ -0,0 +1,52 @@
import os
import shutil
import PyInstaller.__main__
def clean_build_dirs():
"""清理构建目录"""
dirs_to_clean = ['build', 'dist']
for dir_name in dirs_to_clean:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
def create_required_dirs():
"""创建必要的目录"""
os.makedirs('dist/data/xhs/json/media', exist_ok=True)
def copy_required_files():
"""复制必要的文件"""
# 复制配置文件
if os.path.exists('config'):
shutil.copytree('config', 'dist/config', dirs_exist_ok=True)
# 复制其他必要文件
files_to_copy = [
'main.py',
# 添加其他需要复制的文件
]
for file in files_to_copy:
if os.path.exists(file):
shutil.copy2(file, 'dist/')
def main():
# 清理旧的构建文件
clean_build_dirs()
# 运行PyInstaller
PyInstaller.__main__.run([
'build_exe.spec',
'--clean',
'--noconfirm'
])
# 创建必要的目录
create_required_dirs()
# 复制必要的文件
copy_required_files()
print("打包完成!")
if __name__ == "__main__":
main()

159
check_downloads.py Normal file
View File

@ -0,0 +1,159 @@
import os
import mysql.connector
from urllib.parse import urlparse
import requests
import time
# 数据库配置
db_config = {
'user': 'root',
'password': 'zaq12wsx@9Xin',
'host': '183.11.229.79',
'port': 3316,
'database': '9xin',
'auth_plugin': 'mysql_native_password'
}
def download_file(url, save_path):
"""下载文件"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, stream=True, timeout=10)
response.raise_for_status()
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except Exception as e:
print(f'下载文件失败 {url}: {str(e)}')
return False
def retry_download(note_id, image_list, video_url):
"""重试下载失败的媒体文件"""
media_dir = f'./data/xhs/json/media/{note_id}'
download_success = True
# 重试下载图片
if image_list:
image_urls = image_list.split(',')
for i, url in enumerate(image_urls):
url = url.strip()
if not url:
continue
ext = os.path.splitext(urlparse(url).path)[1] or '.jpg'
image_path = os.path.join(media_dir, f'image_{i+1}{ext}')
if not os.path.exists(image_path) or os.path.getsize(image_path) == 0:
print(f'重试下载图片 {note_id} - {i+1}')
if not download_file(url, image_path):
download_success = False
time.sleep(0.5) # 添加延时避免请求过快
# 重试下载视频
if video_url and video_url.strip():
video_url = video_url.strip()
ext = os.path.splitext(urlparse(video_url).path)[1] or '.mp4'
video_path = os.path.join(media_dir, f'video{ext}')
if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
print(f'重试下载视频 {note_id}')
if not download_file(video_url, video_path):
download_success = False
return download_success
def check_media_files():
"""检查媒体文件下载状态并更新数据库,对失败的记录进行重试下载"""
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
# 确保download_flag字段存在
try:
cursor.execute("""
ALTER TABLE xhs_notes
ADD COLUMN IF NOT EXISTS download_flag BOOLEAN DEFAULT FALSE
""")
conn.commit()
except Exception as e:
print(f"添加download_flag字段时出错: {e}")
# 获取所有记录
cursor.execute("""
SELECT note_id, image_list, video_url, download_flag
FROM xhs_notes
""")
records = cursor.fetchall()
update_query = """
UPDATE xhs_notes
SET download_flag = %s
WHERE note_id = %s
"""
total = len(records)
completed = 0
print(f"开始检查 {total} 条记录的下载状态...")
for record in records:
note_id = record['note_id']
is_complete = True
media_dir = f'./data/xhs/json/media/{note_id}'
# 检查图片和视频是否完整
if record['image_list']:
image_urls = record['image_list'].split(',')
for i, url in enumerate(image_urls):
if url.strip():
ext = os.path.splitext(urlparse(url).path)[1] or '.jpg'
image_path = os.path.join(media_dir, f'image_{i+1}{ext}')
if not os.path.exists(image_path) or os.path.getsize(image_path) == 0:
is_complete = False
break
if record['video_url'] and record['video_url'].strip():
url = record['video_url'].strip()
ext = os.path.splitext(urlparse(url).path)[1] or '.mp4'
video_path = os.path.join(media_dir, f'video{ext}')
if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
is_complete = False
# 如果下载不完整,尝试重新下载
if not is_complete:
print(f"发现未完成下载的记录: {note_id},开始重试下载...")
is_complete = retry_download(
note_id,
record['image_list'],
record['video_url']
)
# 更新数据库状态
if is_complete != record['download_flag']:
cursor.execute(update_query, (is_complete, note_id))
status = "完成" if is_complete else "未完成"
print(f"更新记录 {note_id} 的下载状态为: {status}")
completed += 1
if completed % 10 == 0:
print(f"进度: {completed}/{total}")
conn.commit()
print("检查和重试下载完成!")
except Exception as e:
print(f"发生错误: {e}")
finally:
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
check_media_files()

View File

@ -1,24 +1,26 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# 基础配置
PLATFORM = "xhs"
KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔
KEYWORDS = """“宝宝你是一个油痘肌”""" # 关键词搜索配置,以英文逗号分隔
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = ""
# 具体值参见media_platform.xxx.field下的枚举值暂时只支持小红书
SORT_TYPE = "popularity_descending"
# 具体值参见media_platform.xxx.field下的枚举值暂时只支持抖音
PUBLISH_TIME_TYPE = 0
CRAWLER_TYPE = "search" # 爬取类型search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)
CRAWLER_TYPE = (
"search" # 爬取类型search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)
)
# 是否开启 IP 代理
ENABLE_IP_PROXY = False
@ -33,7 +35,7 @@ IP_PROXY_PROVIDER_NAME = "kuaidaili"
# 设置False会打开一个浏览器
# 小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码
# 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。
HEADLESS = False
HEADLESS = True
# 是否保存登录状态
SAVE_LOGIN_STATE = True
@ -48,7 +50,7 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
START_PAGE = 1
# 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 200
CRAWLER_MAX_NOTES_COUNT = 1
# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 1
@ -57,13 +59,12 @@ MAX_CONCURRENCY_NUM = 1
ENABLE_GET_IMAGES = False
# 是否开启爬评论模式, 默认开启爬评论
ENABLE_GET_COMMENTS = True
ENABLE_GET_COMMENTS = False
# 爬取一级评论的数量控制(单视频/帖子)
CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10
# 是否开启爬二级评论模式, 默认不开启爬二级评论
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False
@ -85,15 +86,12 @@ XHS_SPECIFIED_NOTE_URL_LIST = [
# 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = [
"7280854932641664319",
"7202432992642387233"
"7202432992642387233",
# ........................
]
# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = [
"3xf8enb8dbj6uig",
"3x6zz972bchmvqe"
]
KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig", "3x6zz972bchmvqe"]
# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = [
@ -116,9 +114,7 @@ WEIBO_CREATOR_ID_LIST = [
]
# 指定贴吧需要爬取的帖子列表
TIEBA_SPECIFIED_ID_LIST = [
]
TIEBA_SPECIFIED_ID_LIST = []
# 指定贴吧名称列表,爬取该贴吧下的帖子
TIEBA_NAME_LIST = [
@ -167,8 +163,8 @@ ENABLE_GET_WORDCLOUD = False
# 自定义词语及其分组
# 添加规则xx:yy 其中xx为自定义添加的词组yy为将xx该词组分到的组名。
CUSTOM_WORDS = {
'零几': '年份', # 将“零几”识别为一个整体
'高频词': '专业术语' # 示例自定义词
"零几": "年份", # 将“零几”识别为一个整体
"高频词": "专业术语", # 示例自定义词
}
# 停用(禁用)词文件路径

3
config/xhs_config.py Normal file
View File

@ -0,0 +1,3 @@
SEARCH_KEYWORDS = [
"熬夜"
]

View File

@ -11,9 +11,9 @@ const fetchAds = async () => {
return [
{
id: 1,
imageUrl: 'https://nm.zizhi1.com/static/img/40097e36a617f58db1dd132b5841cb1e.ad_pla1.webp',
landingUrl: 'https://nanmicoder.github.io/MediaCrawler/%E4%BD%9C%E8%80%85%E4%BB%8B%E7%BB%8D.html',
text: '⚡️【广告位招租】⚡️投放广告请联系微信:yzglan'
imageUrl: 'https://github.com/NanmiCoder/MediaCrawler/raw/main/docs/static/images/auto_test.png',
landingUrl: 'https://item.jd.com/10124939676219.html',
text: '给好朋友虫师新书站台推荐 - 基于Python的自动化测试框架设计'
}
]
}

141
docs/project_workflow.md Normal file
View File

@ -0,0 +1,141 @@
# 小红书内容智能分析系统
## 项目概述
本项目旨在构建一个自动化的小红书内容采集和智能分析系统。通过爬虫采集、多模态内容处理和AI分析将小红书的图文视频内容转化为结构化的知识输出。
## 系统架构
### 1. 数据采集层
#### 1.1 内容爬取
- 使用 MediaCrawler 爬虫框架
- 根据指定关键词抓取小红书笔记
- 将原始数据保存为 JSON 格式
- 包含笔记文本、图片URL、视频URL等信息
#### 1.2 数据存储
- 将 JSON 数据导入 MySQL 数据库
- 建立规范的数据表结构
- 实现数据的持久化存储和管理
#### 1.3 媒体文件下载
- 从数据库读取媒体文件URL
- 下载笔记关联的图片和视频
- 按笔记ID分类存储在本地文件系统
### 2. 内容处理层
#### 2.1 视频处理
- 使用 Faster-Whisper 模型
- 将视频音频转换为文字
- 支持中文语音识别
- 保存字幕文本
#### 2.2 图像处理
- 使用 ChatGPT-4-Vision 模型
- 分析图片内容
- 提取图片中的关键信息
- 生成图片描述文本
### 3. 智能分析层
#### 3.1 内容理解
- 使用 ChatGPT 处理文本内容
- 整合视频字幕和图片描述
- 生成内容摘要
- 提取关键信息点
#### 3.2 知识图谱
- 基于内容分析生成思维导图
- 展示主题间的逻辑关系
- 可视化知识结构
## 技术栈
- 爬虫框架MediaCrawler
- 数据库MySQL
- 音频处理Faster-Whisper
- 图像识别ChatGPT-4-Vision
- 自然语言处理ChatGPT
- 编程语言Python
## 工作流程图
![工作流程图](static/images/fig1.png)
## 预期成果
1. 自动化的内容采集系统
2. 结构化的多模态数据存储
3. 智能化的内容理解和分析
4. 可视化的知识展示
## 应用场景
- 内容创作参考
- 市场趋势分析
- 用户行为研究
- 知识管理系统
## 后续优化方向
1. 提高爬虫效率和稳定性
2. 优化媒体文件存储结构
3. 提升AI模型处理精度
4. 增强可视化展示效果
5. 添加用户交互界面
## 风险分析
### 1. 法律合规风险
#### 1.1 违反网络安全法风险
- 根据《中华人民共和国网络安全法》第四十四条规定,任何个人和组织不得窃取或者以其他非法方式获取个人信息
- 在爬取过程中必须避免收集用户个人隐私信息
- 确保数据采集和使用符合相关法律法规
#### 1.2 侵犯知识产权风险
- 需注意平台内容的版权问题
- 避免大规模复制和传播他人原创内容
- 不得将爬取的内容用于商业牟利
#### 1.3 违反平台服务条款风险
- 违反平台规则可能面临账号封禁
- 过度爬取可能导致IP封锁
- 严重违规可能引发平台法律诉讼
### 2. 技术风险
#### 2.1 反爬虫机制
- 平台可能部署各种反爬虫措施
- IP被封禁影响采集效率
- 需要不断更新技术方案应对
#### 2.2 数据质量风险
- 采集数据可能不完整或有误
- 多媒体内容下载失败
- 数据格式变化导致解析错误
### 3. 使用建议
#### 3.1 合规使用
- 仅采集公开可见的内容
- 避免采集用户个人信息
- 采集频率保持合理范围
- 遵守平台的robots.txt规则
#### 3.2 技术防范
- 使用代理IP分散请求
- 控制请求频率和并发数
- 做好异常处理和日志记录
- 定期备份重要数据
### 4. 案例警示
根据[GitHub上的中国爬虫违法违规案例汇总](https://github.com/HiddenStrawberry/Crawler_Illegal_Cases_In_China),以下行为可能带来严重法律后果:
- 爬取和贩卖个人隐私数据
- 破解验证码并提供服务
- 未经授权爬取并复制商业数据
- 大规模爬取导致目标网站服务中断
### 5. 合规建议
1. 项目启动前进行法律可行性评估
2. 建立数据安全管理制度
3. 保留完整的操作日志记录
4. 定期进行合规性自查
5. 如有必要可咨询法律专家

BIN
docs/project_workflow.pdf Normal file

Binary file not shown.

BIN
docs/project_workflow_1.pdf Normal file

Binary file not shown.

BIN
docs/static/images/auto_test.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 221 KiB

BIN
docs/static/images/fig1.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

BIN
docs/static/images/新建 BMP 图像.bmp vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

98
flv.md Normal file
View File

@ -0,0 +1,98 @@
### CAAC无人机飞行执照申报指南
#### 1. 无人机执照分类
根据中国民用航空局CAAC规定无人机飞行执照主要分为以下几种类型
1. **超视距驾驶员执照(机长执照)**
- 允许持证人在视觉范围之外操作无人机。
- 适用于远距离和高难度任务作业半径通常大于500米或人机相对高度大于120米。
2. **视距内驾驶员执照**
- 允许在视觉范围内操作无人机。
- 适用于短距离和低难度任务作业半径通常小于或等于500米人机相对高度小于或等于120米。
3. **教员证**
- 具备培训和指导他人学习无人机操作的能力。
- 要求丰富的飞行经验和优秀的教学能力。
此外,根据无人机的类型,执照还可细分为多旋翼无人机执照和固定翼无人机执照等。
---
#### 2. 申报执照的基本条件
1. **年龄要求**申请人需年满16周岁。
2. **身体要求**:通过体检,确保身体健康,具备适应无人机飞行的基本身体条件。
3. **背景审查**:无重大违法记录。
---
#### 3. 申报执照的具体步骤
1. **参加培训课程**
- 培训内容包括:
- **理论知识**:飞行法规、无人机飞行原理、航空气象等。
- **实际飞行技能**:无人机起飞、降落、空中控制及应急操作。
- 正规的培训课程是执照申报的必要条件。
2. **完成理论考试**
- 由CAAC组织考试内容涵盖培训所学理论知识。
3. **完成实际操作考核**
- 包括无人机操控技能考核。
- 要求达到规定标准。
4. **申请执照**
- 提交相关材料,包括身份证明、体检证明、考试合格证明等。
- 通过审核后即可获得CAAC颁发的无人机飞行执照。
---
#### 4. 是否需要参加培训班?
是的。申请人必须通过正规培训班,以学习必要的理论知识和操作技能。培训课程的内容通常包括:
- 无人机飞行操作
- 安全措施与应急处理
- 飞行法规与规则
---
#### 5. 机构推荐
以下是提供CAAC无人机执照培训的知名机构
1. **翼飞鸿天**
- 拥有民航局和AOPA认证的教员学员考试通过率高。
2. **鲲鹏堂**
- 成立于2012年提供多种无人机类型的培训实操场地广阔。
3. **通航无人机**
- 提供专业培训,设施先进,适合不同层次学员。
4. **能飞无人机学院**
- 中南地区指定考试中心,课程种类多。
其他机构包括撼动、华悦、新航道、三足、青华航宇等,均具备资质并提供系统化培训。
---
#### 6. 培训费用参考
- **超视距驾驶员执照**10,000元15,800元。
- **视距内驾驶员执照**8,500元12,800元。
- **固定翼与垂直起降固定翼执照**16,000元20,000元。
- **直升机执照**16,000元20,000元。
> **建议**:选择培训机构时,重点考虑其师资力量、资质和成功案例,确保获得高质量培训。
---
#### 7. 注意事项
- 确保申请材料真实、完整。
- 提前安排体检,避免申请过程中的不必要延误。
- 选择正规培训机构,以保证顺利通过考试并获取执照。
---
如有进一步问题可咨询CAAC官网或直接联系相关培训机构获取详细信息。

BIN
flv.pdf Normal file

Binary file not shown.

BIN
input.xlsx Normal file

Binary file not shown.

231
integrate_xhs_crawler.py Normal file
View File

@ -0,0 +1,231 @@
import json
import os
from data.xhs.json.import_xhs_notes import connect_to_database, create_table, check_record_exists
# from check_downloads import check_media_files
from mysql.connector import Error
import time
import asyncio
import subprocess
from datetime import datetime
import random
async def _run_crawler(keyword):
"""运行爬虫的异步实现"""
try:
process = await asyncio.create_subprocess_exec(
'python', 'main.py',
'--platform', 'xhs',
'--lt', 'qrcode',
'--keywords', keyword,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024*1024
)
# 读取输出流
async def read_stream(stream):
buffer = ""
while True:
chunk = await stream.read(8192)
if not chunk:
break
text = chunk.decode('utf-8', errors='ignore')
buffer += text
# 处理输出
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line.strip():
print(f"爬虫进度: {line.strip()}")
# 同时处理标准输出和错误输出
await asyncio.gather(
read_stream(process.stdout),
read_stream(process.stderr)
)
await process.wait()
return process.returncode == 0
except Exception as e:
print(f"爬虫执行错误: {str(e)}")
return False
def load_search_keywords():
"""从sheet_notes文件夹加载搜索关键词"""
keywords_dict = {}
json_dir = './data/xhs/json/sheet_notes'
for json_file in os.listdir(json_dir):
if not json_file.endswith('.json'):
continue
sheet_name = os.path.splitext(json_file)[0]
with open(os.path.join(json_dir, json_file), 'r', encoding='utf-8') as f:
keywords = json.load(f)
# 修护.json从第12个元素开始
# if sheet_name == '修护':
# keywords = keywords[11:]
keywords_dict[sheet_name] = keywords
return keywords_dict
def insert_note_data(connection, data, sheet_name):
"""插入笔记数据到数据库"""
insert_query = """
INSERT INTO xhs_notes (
note_id, type, title, description, video_url, time,
last_update_time, user_id, nickname, avatar,
liked_count, collected_count, comment_count, share_count,
ip_location, image_list, tag_list, last_modify_ts,
note_url, source_keyword, sheet_name, download_flag
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
try:
cursor = connection.cursor()
inserted_count = 0
skipped_count = 0
for item in data:
note_id = item.get('note_id')
# 检查记录是否已存在
if check_record_exists(cursor, note_id):
skipped_count += 1
continue
values = (
note_id,
item.get('type'),
item.get('title'),
item.get('desc'),
item.get('video_url'),
item.get('time'),
item.get('last_update_time'),
item.get('user_id'),
item.get('nickname'),
item.get('avatar'),
item.get('liked_count'),
item.get('collected_count'),
item.get('comment_count'),
item.get('share_count'),
item.get('ip_location'),
item.get('image_list'),
item.get('tag_list'),
item.get('last_modify_ts'),
item.get('note_url'),
item.get('source_keyword'),
sheet_name,
False # download_flag 默认为False
)
cursor.execute(insert_query, values)
inserted_count += 1
connection.commit()
print(f'成功插入 {inserted_count} 条新数据')
print(f'跳过 {skipped_count} 条已存在的数据')
except Error as e:
print(f'插入数据时出错: {e}')
connection.rollback()
def search_xhs_notes(keyword):
"""搜索小红书笔记"""
try:
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 运行爬虫
success = loop.run_until_complete(_run_crawler(keyword))
if not success:
print(f"爬虫执行失败: {keyword}")
return None
# 读取爬虫结果
json_path = f'./data/xhs/json/search_contents_{datetime.now().strftime("%Y-%m-%d")}.json'
if not os.path.exists(json_path):
print(f"找不到爬虫结果文<EFBFBD><EFBFBD>: {json_path}")
return None
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 为每条记录添加来源关键词
for item in data:
item['source_keyword'] = keyword
return data
finally:
loop.close()
except Exception as e:
print(f"搜索过程发生错误: {str(e)}")
return None
def keyword_exists_in_db(connection, keyword):
"""检查关键词是否已存在于数据库中"""
query = "SELECT COUNT(*) FROM xhs_notes WHERE source_keyword = %s"
cursor = connection.cursor()
cursor.execute(query, (keyword,))
result = cursor.fetchone()
return result[0] > 0
def main():
# 连接数据库
connection = connect_to_database()
if connection is None:
return
try:
# 创建表格
create_table(connection)
# 加载搜索关键词
keywords_dict = load_search_keywords()
# 对每个sheet的关键词进行搜索
for sheet_name, keywords in keywords_dict.items():
print(f'开始处理 {sheet_name} 的关键词...')
for keyword in keywords:
# 检查关键词是否已存在
if keyword_exists_in_db(connection, keyword):
print(f'关键词已存在,跳过: {keyword}')
continue
print(f'搜索关键词: {keyword}')
# 搜索小红书笔记
search_results = search_xhs_notes(keyword)
if search_results:
# 将搜索结果保存到数据库
insert_note_data(connection, search_results, sheet_name)
else:
print(f"未获取到搜索结果: {keyword}")
# 添加延时避免请求过快随机延时10-30秒
time.sleep(random.uniform(10, 30))
print(f'{sheet_name} 的关键词处理完成')
# 下载所有媒体文件
# print('开始下载媒体文件...')
# check_media_files()
except Exception as e:
print(f'处理过程中出错: {e}')
finally:
if connection.is_connected():
connection.close()
print('数据库连接已关闭')
if __name__ == "__main__":
main()

3
launcher.bat Normal file
View File

@ -0,0 +1,3 @@
@echo off
cd /d %~dp0
start "" "小红书内容采集器.exe"

View File

@ -261,8 +261,8 @@ class WeiboCrawler(AbstractCrawler):
callback=weibo_store.batch_update_weibo_notes
)
note_ids = [note_item.get("mlog", {}).get("id") for note_item in all_notes_list if
note_item.get("mlog", {}).get("id")]
note_ids = [note_item.get("mblog", {}).get("id") for note_item in all_notes_list if
note_item.get("mblog", {}).get("id")]
await self.batch_get_notes_comments(note_ids)
else:

View File

@ -1,12 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
@ -31,13 +31,13 @@ from .help import get_search_id, sign
class XiaoHongShuClient(AbstractApiClient):
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
@ -61,20 +61,22 @@ class XiaoHongShuClient(AbstractApiClient):
Returns:
"""
encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
encrypt_params = await self.playwright_page.evaluate(
"([url, data]) => window._webmsxyw(url,data)", [url, data]
)
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
signs = sign(
a1=self.cookie_dict.get("a1", ""),
b1=local_storage.get("b1", ""),
x_s=encrypt_params.get("X-s", ""),
x_t=str(encrypt_params.get("X-t", ""))
x_t=str(encrypt_params.get("X-t", "")),
)
headers = {
"X-S": signs["x-s"],
"X-T": signs["x-t"],
"x-S-Common": signs["x-s-common"],
"X-B3-Traceid": signs["x-b3-traceid"]
"X-B3-Traceid": signs["x-b3-traceid"],
}
self.headers.update(headers)
return self.headers
@ -92,20 +94,18 @@ class XiaoHongShuClient(AbstractApiClient):
"""
# return response.text
return_response = kwargs.pop('return_response', False)
return_response = kwargs.pop("return_response", False)
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
response = await client.request(method, url, timeout=self.timeout, **kwargs)
if response.status_code == 471 or response.status_code == 461:
# someday someone maybe will bypass captcha
verify_type = response.headers['Verifytype']
verify_uuid = response.headers['Verifyuuid']
verify_type = response.headers["Verifytype"]
verify_uuid = response.headers["Verifyuuid"]
raise Exception(
f"出现验证码请求失败Verifytype: {verify_type}Verifyuuid: {verify_uuid}, Response: {response}")
f"出现验证码请求失败Verifytype: {verify_type}Verifyuuid: {verify_uuid}, Response: {response}"
)
if return_response:
return response.text
@ -129,10 +129,11 @@ class XiaoHongShuClient(AbstractApiClient):
"""
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
final_uri = f"{uri}?" f"{urlencode(params)}"
headers = await self._pre_headers(final_uri)
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
return await self.request(
method="GET", url=f"{self._host}{final_uri}", headers=headers
)
async def post(self, uri: str, data: dict, **kwargs) -> Dict:
"""
@ -145,15 +146,22 @@ class XiaoHongShuClient(AbstractApiClient):
"""
headers = await self._pre_headers(uri, data)
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=headers, **kwargs)
json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
return await self.request(
method="POST",
url=f"{self._host}{uri}",
data=json_str,
headers=headers,
**kwargs,
)
async def get_note_media(self, url: str) -> Union[bytes, None]:
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request("GET", url, timeout=self.timeout)
if not response.reason_phrase == "OK":
utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
utils.logger.error(
f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}"
)
return None
else:
return response.content
@ -172,7 +180,9 @@ class XiaoHongShuClient(AbstractApiClient):
if note_card.get("items"):
ping_flag = True
except Exception as e:
utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
utils.logger.error(
f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again..."
)
ping_flag = False
return ping_flag
@ -190,11 +200,13 @@ class XiaoHongShuClient(AbstractApiClient):
self.cookie_dict = cookie_dict
async def get_note_by_keyword(
self, keyword: str,
search_id: str = get_search_id(),
page: int = 1, page_size: int = 20,
sort: SearchSortType = SearchSortType.GENERAL,
note_type: SearchNoteType = SearchNoteType.ALL
self,
keyword: str,
search_id: str = get_search_id(),
page: int = 1,
page_size: int = 20,
sort: SearchSortType = SearchSortType.GENERAL,
note_type: SearchNoteType = SearchNoteType.ALL,
) -> Dict:
"""
根据关键词搜索笔记
@ -215,11 +227,13 @@ class XiaoHongShuClient(AbstractApiClient):
"page_size": page_size,
"search_id": search_id,
"sort": sort.value,
"note_type": note_type.value
"note_type": note_type.value,
}
return await self.post(uri, data)
async def get_note_by_id(self, note_id: str, xsec_source: str, xsec_token: str) -> Dict:
async def get_note_by_id(
self, note_id: str, xsec_source: str, xsec_token: str
) -> Dict:
"""
获取笔记详情API
Args:
@ -238,7 +252,7 @@ class XiaoHongShuClient(AbstractApiClient):
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": 1},
"xsec_source": xsec_source,
"xsec_token": xsec_token
"xsec_token": xsec_token,
}
uri = "/api/sns/web/v1/feed"
res = await self.post(uri, data)
@ -246,14 +260,19 @@ class XiaoHongShuClient(AbstractApiClient):
res_dict: Dict = res["items"][0]["note_card"]
return res_dict
# 爬取频繁了可能会出现有的笔记能有结果有的没有
utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
utils.logger.error(
f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}"
)
return dict()
async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
async def get_note_comments(
self, note_id: str, xsec_token: str, cursor: str = ""
) -> Dict:
"""
获取一级评论的API
Args:
note_id: 笔记ID
xsec_token: 验证token
cursor: 分页游标
Returns:
@ -264,16 +283,25 @@ class XiaoHongShuClient(AbstractApiClient):
"note_id": note_id,
"cursor": cursor,
"top_comment_id": "",
"image_formats": "jpg,webp,avif"
"image_formats": "jpg,webp,avif",
"xsec_token": xsec_token,
}
return await self.get(uri, params)
async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""):
async def get_note_sub_comments(
self,
note_id: str,
root_comment_id: str,
xsec_token: str,
num: int = 10,
cursor: str = "",
):
"""
获取指定父评论下的子评论的API
Args:
note_id: 子评论的帖子ID
root_comment_id: 根评论ID
xsec_token: 验证token
num: 分页数量
cursor: 分页游标
@ -286,16 +314,25 @@ class XiaoHongShuClient(AbstractApiClient):
"root_comment_id": root_comment_id,
"num": num,
"cursor": cursor,
"image_formats": "jpg,webp,avif",
"top_comment_id": "",
"xsec_token": xsec_token,
}
return await self.get(uri, params)
async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 10) -> List[Dict]:
async def get_note_all_comments(
self,
note_id: str,
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 10,
) -> List[Dict]:
"""
获取指定笔记下的所有一级评论该方法会一直查找一个帖子下的所有评论信息
Args:
note_id: 笔记ID
xsec_token: 验证token
crawl_interval: 爬取一次笔记的延迟单位
callback: 一次笔记爬取结束后
max_count: 一次笔记爬取的最大评论数量
@ -306,39 +343,54 @@ class XiaoHongShuClient(AbstractApiClient):
comments_has_more = True
comments_cursor = ""
while comments_has_more and len(result) < max_count:
comments_res = await self.get_note_comments(note_id, comments_cursor)
comments_res = await self.get_note_comments(
note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor
)
comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
utils.logger.info(
f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}"
)
break
comments = comments_res["comments"]
if len(result) + len(comments) > max_count:
comments = comments[:max_count - len(result)]
comments = comments[: max_count - len(result)]
if callback:
await callback(note_id, comments)
await asyncio.sleep(crawl_interval)
result.extend(comments)
sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback)
sub_comments = await self.get_comments_all_sub_comments(
comments=comments,
xsec_token=xsec_token,
crawl_interval=crawl_interval,
callback=callback,
)
result.extend(sub_comments)
return result
async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[Dict]:
async def get_comments_all_sub_comments(
self,
comments: List[Dict],
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[Dict]:
"""
获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
Args:
comments: 评论列表
xsec_token: 验证token
crawl_interval: 爬取一次评论的延迟单位
callback: 一次评论爬取结束后
Returns:
"""
if not config.ENABLE_GET_SUB_COMMENTS:
utils.logger.info(
f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled"
)
return []
result = []
@ -356,12 +408,19 @@ class XiaoHongShuClient(AbstractApiClient):
sub_comment_cursor = comment.get("sub_comment_cursor")
while sub_comment_has_more:
comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor)
comments_res = await self.get_note_sub_comments(
note_id=note_id,
root_comment_id=root_comment_id,
xsec_token=xsec_token,
num=10,
cursor=sub_comment_cursor,
)
sub_comment_has_more = comments_res.get("has_more", False)
sub_comment_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
utils.logger.info(
f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}"
)
break
comments = comments_res["comments"]
if callback:
@ -377,21 +436,23 @@ class XiaoHongShuClient(AbstractApiClient):
eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
"""
uri = f"/user/profile/{user_id}"
html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
html_content = await self.request(
"GET", self._domain + uri, return_response=True, headers=self.headers
)
match = re.search(
r"<script>window.__INITIAL_STATE__=(.+)<\/script>", html_content, re.M
)
if match is None:
return {}
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
info = json.loads(match.group(1).replace(":undefined", ":null"), strict=False)
if info is None:
return {}
return info.get('user').get('userPageData')
return info.get("user").get("userPageData")
async def get_notes_by_creator(
self, creator: str,
cursor: str,
page_size: int = 30
self, creator: str, cursor: str, page_size: int = 30
) -> Dict:
"""
获取博主的笔记
@ -408,12 +469,16 @@ class XiaoHongShuClient(AbstractApiClient):
"user_id": creator,
"cursor": cursor,
"num": page_size,
"image_formats": "jpg,webp,avif"
"image_formats": "jpg,webp,avif",
}
return await self.get(uri, data)
async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[Dict]:
async def get_all_notes_by_creator(
self,
user_id: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[Dict]:
"""
获取指定用户下的所有发过的帖子该方法会一直查找一个用户下的所有帖子信息
Args:
@ -431,19 +496,22 @@ class XiaoHongShuClient(AbstractApiClient):
notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
if not notes_res:
utils.logger.error(
f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data."
)
break
notes_has_more = notes_res.get("has_more", False)
notes_cursor = notes_res.get("cursor", "")
if "notes" not in notes_res:
utils.logger.info(
f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}"
)
break
notes = notes_res["notes"]
utils.logger.info(
f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}"
)
if callback:
await callback(notes)
await asyncio.sleep(crawl_interval)
@ -460,13 +528,17 @@ class XiaoHongShuClient(AbstractApiClient):
"""
uri = f"/api/sns/web/short_url"
data = {
"original_url": f"{self._domain}/discovery/item/{note_id}"
}
data = {"original_url": f"{self._domain}/discovery/item/{note_id}"}
return await self.post(uri, data=data, return_response=True)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def get_note_by_id_from_html(self, note_id: str, xsec_source: str, xsec_token: str) -> Dict:
async def get_note_by_id_from_html(
self,
note_id: str,
xsec_source: str,
xsec_token: str,
enable_cookie: bool = False,
) -> Optional[Dict]:
"""
通过解析网页版的笔记详情页HTML获取笔记详情, 该接口可能会出现失败的情况这里尝试重试3次
copy from https://github.com/ReaJason/xhs/blob/eb1c5a0213f6fbb592f0a2897ee552847c69ea2d/xhs/core.py#L217-L259
@ -475,10 +547,12 @@ class XiaoHongShuClient(AbstractApiClient):
note_id:
xsec_source:
xsec_token:
enable_cookie:
Returns:
"""
def camel_to_underscore(key):
return re.sub(r"(?<!^)(?=[A-Z])", "_", key).lower()
@ -493,18 +567,30 @@ class XiaoHongShuClient(AbstractApiClient):
dict_new[new_key] = transform_json_keys(json.dumps(value))
elif isinstance(value, list):
dict_new[new_key] = [
transform_json_keys(json.dumps(item))
if (item and isinstance(item, dict))
else item
(
transform_json_keys(json.dumps(item))
if (item and isinstance(item, dict))
else item
)
for item in value
]
else:
dict_new[new_key] = value
return dict_new
url = "https://www.xiaohongshu.com/explore/" + note_id + f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
html = await self.request(method="GET", url=url, return_response=True, headers=self.headers)
url = (
"https://www.xiaohongshu.com/explore/"
+ note_id
+ f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
)
copy_headers = self.headers.copy()
if not enable_cookie:
del copy_headers["Cookie"]
html = await self.request(
method="GET", url=url, return_response=True, headers=copy_headers
)
def get_note_dict(html):
state = re.findall(r"window.__INITIAL_STATE__=({.*})</script>", html)[
0
@ -518,32 +604,4 @@ class XiaoHongShuClient(AbstractApiClient):
try:
return get_note_dict(html)
except:
href = re.findall(r'href="(.*?)"', html)[0]
href = unescape(href)
utils.logger.info(
f"[XiaoHongShuClient.get_note_by_id_from_html] 出现验证码: {href}, 请手动验证"
)
await self.playwright_page.goto(href)
# 等待用户完成操作页面重定向
if await self.check_redirect():
utils.logger.info(
f"[XiaoHongShuClient.get_note_by_id_from_html] 用户完成验证, 重定向到笔记详情页"
)
html = await self.playwright_page.content()
return get_note_dict(html)
else:
raise DataFetchError(html)
@retry(
stop=stop_after_attempt(100),
wait=wait_fixed(5),
retry=retry_if_result(lambda value: value is False),
)
async def check_redirect(self):
url = self.playwright_page.url
if url.startswith("https://www.xiaohongshu.com/explore"):
return True
return False
return None

View File

@ -1,12 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
@ -15,8 +15,7 @@ import random
from asyncio import Task
from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
from playwright.async_api import BrowserContext, BrowserType, Page, async_playwright
from tenacity import RetryError
import config
@ -48,28 +47,33 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def start(self) -> None:
playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info)
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(
ip_proxy_info
)
async with async_playwright() as playwright:
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium,
None,
self.user_agent,
headless=config.HEADLESS
chromium, None, self.user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# add a cookie attribute webId to avoid the appearance of a sliding captcha on the webpage
await self.browser_context.add_cookies([{
'name': "webId",
'value': "xxx123", # any value
'domain': ".xiaohongshu.com",
'path': "/"
}])
await self.browser_context.add_cookies(
[
{
"name": "webId",
"value": "xxx123", # any value
"domain": ".xiaohongshu.com",
"path": "/",
}
]
)
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url)
@ -81,10 +85,12 @@ class XiaoHongShuCrawler(AbstractCrawler):
login_phone="", # input your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES
cookie_str=config.COOKIES,
)
await login_obj.begin()
await self.xhs_client.update_cookies(browser_context=self.browser_context)
await self.xhs_client.update_cookies(
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
@ -103,33 +109,48 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def search(self) -> None:
"""Search for notes and retrieve their comment information."""
utils.logger.info("[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords")
utils.logger.info(
"[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords"
)
xhs_limit_count = 20 # xhs limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < xhs_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = xhs_limit_count
start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}")
utils.logger.info(
f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}"
)
page = 1
search_id = get_search_id()
while (page - start_page + 1) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
while (
page - start_page + 1
) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[XiaoHongShuCrawler.search] Skip page {page}")
page += 1
continue
try:
utils.logger.info(f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}")
note_id_list: List[str] = []
utils.logger.info(
f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}"
)
note_ids: List[str] = []
xsec_tokens: List[str] = []
notes_res = await self.xhs_client.get_note_by_keyword(
keyword=keyword,
search_id=search_id,
page=page,
sort=SearchSortType(config.SORT_TYPE) if config.SORT_TYPE != '' else SearchSortType.GENERAL,
sort=(
SearchSortType(config.SORT_TYPE)
if config.SORT_TYPE != ""
else SearchSortType.GENERAL
),
)
utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}")
if not notes_res or not notes_res.get('has_more', False):
utils.logger.info(
f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}"
)
if not notes_res or not notes_res.get("has_more", False):
utils.logger.info("No more content!")
break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
@ -138,30 +159,39 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_id=post_item.get("id"),
xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"),
semaphore=semaphore
semaphore=semaphore,
)
for post_item in notes_res.get("items", {})
if post_item.get('model_type') not in ('rec_query', 'hot_query')
if post_item.get("model_type") not in ("rec_query", "hot_query")
]
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail:
await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail)
note_id_list.append(note_detail.get("note_id"))
note_ids.append(note_detail.get("note_id"))
xsec_tokens.append(note_detail.get("xsec_token"))
page += 1
utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
await self.batch_get_note_comments(note_id_list)
utils.logger.info(
f"[XiaoHongShuCrawler.search] Note details: {note_details}"
)
await self.batch_get_note_comments(note_ids, xsec_tokens)
except DataFetchError:
utils.logger.error("[XiaoHongShuCrawler.search] Get note detail error")
utils.logger.error(
"[XiaoHongShuCrawler.search] Get note detail error"
)
break
async def get_creators_and_notes(self) -> None:
"""Get creator's notes and retrieve their comment information."""
utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
utils.logger.info(
"[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators"
)
for user_id in config.XHS_CREATOR_ID_LIST:
# get creator detail info from web html content
createor_info: Dict = await self.xhs_client.get_creator_info(user_id=user_id)
createor_info: Dict = await self.xhs_client.get_creator_info(
user_id=user_id
)
if createor_info:
await xhs_store.save_creator(user_id, creator=createor_info)
@ -169,11 +199,15 @@ class XiaoHongShuCrawler(AbstractCrawler):
all_notes_list = await self.xhs_client.get_all_notes_by_creator(
user_id=user_id,
crawl_interval=random.random(),
callback=self.fetch_creator_notes_detail
callback=self.fetch_creator_notes_detail,
)
note_ids = [note_item.get("note_id") for note_item in all_notes_list]
await self.batch_get_note_comments(note_ids)
note_ids = []
xsec_tokens = []
for note_item in all_notes_list:
note_ids.append(note_item.get("note_id"))
xsec_tokens.append(note_item.get("xsec_token"))
await self.batch_get_note_comments(note_ids, xsec_tokens)
async def fetch_creator_notes_detail(self, note_list: List[Dict]):
"""
@ -185,7 +219,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_id=post_item.get("note_id"),
xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"),
semaphore=semaphore
semaphore=semaphore,
)
for post_item in note_list
]
@ -205,73 +239,133 @@ class XiaoHongShuCrawler(AbstractCrawler):
get_note_detail_task_list = []
for full_note_url in config.XHS_SPECIFIED_NOTE_URL_LIST:
note_url_info: NoteUrlInfo = parse_note_info_from_note_url(full_note_url)
utils.logger.info(f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}")
utils.logger.info(
f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}"
)
crawler_task = self.get_note_detail_async_task(
note_id=note_url_info.note_id,
xsec_source=note_url_info.xsec_source,
xsec_token=note_url_info.xsec_token,
semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM),
)
get_note_detail_task_list.append(crawler_task)
need_get_comment_note_ids = []
xsec_tokens = []
note_details = await asyncio.gather(*get_note_detail_task_list)
for note_detail in note_details:
if note_detail:
need_get_comment_note_ids.append(note_detail.get("note_id", ""))
xsec_tokens.append(note_detail.get("xsec_token", ""))
await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(need_get_comment_note_ids)
await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens)
async def get_note_detail_async_task(
self,
note_id: str,
xsec_source: str,
xsec_token: str,
semaphore: asyncio.Semaphore,
) -> Optional[Dict]:
"""Get note detail
async def get_note_detail_async_task(self, note_id: str, xsec_source: str, xsec_token: str, semaphore: asyncio.Semaphore) -> \
Optional[Dict]:
"""Get note detail"""
Args:
note_id:
xsec_source:
xsec_token:
semaphore:
Returns:
Dict: note detail
"""
note_detail_from_html, note_detail_from_api = None, None
async with semaphore:
try:
note_detail: Dict = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token)
# note_detail: Dict = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token)
if not note_detail:
# 尝试直接获取网页版笔记详情携带cookie
note_detail_from_html: Optional[Dict] = (
await self.xhs_client.get_note_by_id_from_html(
note_id, xsec_source, xsec_token, enable_cookie=True
)
)
if not note_detail_from_html:
# 如果网页版笔记详情获取失败则尝试不使用cookie获取
note_detail_from_html = (
await self.xhs_client.get_note_by_id_from_html(
note_id, xsec_source, xsec_token, enable_cookie=False
)
)
utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error, note_id: {note_id}")
return None
note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source})
return note_detail
f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error, note_id: {note_id}"
)
if not note_detail_from_html:
# 如果网页版笔记详情获取失败则尝试API获取
note_detail_from_api: Optional[Dict] = (
await self.xhs_client.get_note_by_id(
note_id, xsec_source, xsec_token
)
)
note_detail = note_detail_from_html or note_detail_from_api
if note_detail:
note_detail.update(
{"xsec_token": xsec_token, "xsec_source": xsec_source}
)
return note_detail
except DataFetchError as ex:
utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}")
utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}"
)
return None
except KeyError as ex:
utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}")
f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}"
)
return None
async def batch_get_note_comments(self, note_list: List[str]):
async def batch_get_note_comments(
self, note_list: List[str], xsec_tokens: List[str]
):
"""Batch get note comments"""
if not config.ENABLE_GET_COMMENTS:
utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled")
utils.logger.info(
f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return
utils.logger.info(
f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}")
f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for note_id in note_list:
task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id)
for index, note_id in enumerate(note_list):
task = asyncio.create_task(
self.get_comments(
note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore
),
name=note_id,
)
task_list.append(task)
await asyncio.gather(*task_list)
async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore):
async def get_comments(
self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore
):
"""Get note comments with keyword filtering and quantity limitation"""
async with semaphore:
utils.logger.info(f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}")
utils.logger.info(
f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}"
)
await self.xhs_client.get_note_all_comments(
note_id=note_id,
xsec_token=xsec_token,
crawl_interval=random.random(),
callback=xhs_store.batch_update_xhs_note_comments,
max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES
max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
)
@staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]:
def format_proxy_info(
ip_proxy_info: IpInfoModel,
) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx"""
playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
@ -285,8 +379,12 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient:
"""Create xhs client"""
utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
utils.logger.info(
"[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ..."
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
xhs_client_obj = XiaoHongShuClient(
proxies=httpx_proxy,
headers={
@ -294,7 +392,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
"Cookie": cookie_str,
"Origin": "https://www.xiaohongshu.com",
"Referer": "https://www.xiaohongshu.com",
"Content-Type": "application/json;charset=UTF-8"
"Content-Type": "application/json;charset=UTF-8",
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
@ -302,33 +400,35 @@ class XiaoHongShuCrawler(AbstractCrawler):
return xhs_client_obj
async def launch_browser(
self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True
self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
"""Launch browser and create browser context"""
utils.logger.info("[XiaoHongShuCrawler.launch_browser] Begin create browser context ...")
utils.logger.info(
"[XiaoHongShuCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE:
# feat issue #14
# we will save login state to avoid login every time
user_data_dir = os.path.join(os.getcwd(), "browser_data",
config.USER_DATA_DIR % config.PLATFORM) # type: ignore
user_data_dir = os.path.join(
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir,
accept_downloads=True,
headless=headless,
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
user_agent=user_agent,
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context
@ -339,7 +439,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def get_notice_media(self, note_detail: Dict):
if not config.ENABLE_GET_IMAGES:
utils.logger.info(f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled")
utils.logger.info(
f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled"
)
return
await self.get_note_images(note_detail)
await self.get_notice_video(note_detail)
@ -356,8 +458,8 @@ class XiaoHongShuCrawler(AbstractCrawler):
image_list: List[Dict] = note_item.get("image_list", [])
for img in image_list:
if img.get('url_default') != '':
img.update({'url': img.get('url_default')})
if img.get("url_default") != "":
img.update({"url": img.get("url_default")})
if not image_list:
return

77
playwright.md Normal file
View File

@ -0,0 +1,77 @@
# Playwright 爬虫技术分析
## Playwright 简介
Playwright 是一个由微软开发的自动化浏览器测试工具,它可以:
1. 自动控制 Chromium、Firefox 和 WebKit 浏览器
2. 模拟真实用户操作
3. 执行 JavaScript 代码
4. 获取浏览器上下文和 Cookie
## 使用 Playwright 爬取小红书的步骤
根据代码分析,主要步骤如下:
1. **初始化浏览器环境**
```python
async with async_playwright() as playwright:
browser_context = await self.launch_browser(
playwright.chromium,
proxy=None,
headless=config.HEADLESS
)
```
2. **登录获取 Cookie**
```python
# 扫码登录
login_obj = XiaoHongShuLogin(
login_type=config.LOGIN_TYPE,
browser_context=self.browser_context,
context_page=self.context_page
)
await login_obj.begin()
```
3. **获取加密参数**
- 通过执行 JS 获取 X-s 等签名参数
- 保留登录成功后的上下文环境
- 避免了复杂的 JS 逆向过程
4. **发送请求获取数据**
```python
# 搜索笔记
await self.xhs_client.search_notes(
keyword=keyword,
page=page,
sort=SearchSortType.GENERAL
)
# 获取笔记详情
await self.xhs_client.get_note_by_id(note_id)
```
5. **数据存储**
- 支持存储到 MySQL
- 支持导出为 CSV
- 支持导出为 JSON
## 核心优势
1. 降低逆向难度
- 不需要复现复杂的加密算法
- 直接获取浏览器中的参数
2. 更真实的请求环境
- 完整的浏览器环境
- 真实的 Cookie 和请求头
3. 更稳定的爬取
- 自动处理反爬验证
- 支持 IP 代理池
- 支持登录态缓存
这种方式通过 Playwright 模拟真实浏览器环境,大大降低了爬虫开发难度,是一种非常实用的爬虫技术方案。
需要注意的是,使用时要遵守平台规则,合理控制爬取频率,仅用于学习研究用途。

40
process_xhs_note.py Normal file
View File

@ -0,0 +1,40 @@
import pandas as pd
import json
import os
def excel_to_json():
# 创建输出文件夹
output_dir = 'sheet_notes'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 读取Excel文件
excel_file = 'input.xlsx'
# 获取所有sheet名称
xl = pd.ExcelFile(excel_file)
sheet_names = xl.sheet_names
# 处理每个sheet
for sheet_name in sheet_names:
# 读取当前sheet跳过第一行使用第二行作为列名
df = pd.read_excel(excel_file,
sheet_name=sheet_name,
header=1)
# 获取"笔记标题"列的内容
if '笔记标题' in df.columns:
# 将笔记标题转换为列表,并去除空值
notes = df['笔记标题'].dropna().tolist()
# 保存为JSON文件
output_file = os.path.join(output_dir, f'{sheet_name}.json')
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(notes, f, ensure_ascii=False, indent=4)
print(f'已保存 {sheet_name} 的笔记标题到 {output_file}')
else:
print(f'警告: {sheet_name} 中没有找到"笔记标题"')
if __name__ == '__main__':
excel_to_json()

81
project.md Normal file
View File

@ -0,0 +1,81 @@
# MediaCrawler 项目技术分析
## 总体架构
项目使用了 Playwright 作为核心技术来模拟浏览器行为,主要步骤包括:
1. 使用 Playwright 启动浏览器并保持登录状态
2. 通过执行 JS 获取加密参数
3. 使用 httpx 发送请求获取数据
4. 数据存储到 MySQL/CSV/JSON
## 各平台实现细节
### 1. 抖音
核心技术:
- 使用 Playwright 模拟浏览器环境
- 通过执行 JS 获取 X-Bogus 等加密参数
- 支持搜索、指定视频、创作者主页爬取
- 支持评论和二级评论获取
### 2. 小红书
核心技术:
- 使用 Playwright 获取登录态
- 通过 JS 注入获取 X-s 等签名参数
- 支持笔记搜索、指定笔记、用户主页爬取
- 支持评论数据抓取
### 3. 快手
核心技术:
- 使用 Playwright 维持登录状态
- 通过 JS 执行获取 _signature 等参数
- 支持视频搜索和指定视频爬取
- 支持评论数据抓取
### 4. B站
核心技术:
- Cookie 登录方式
- wbi 签名参数构造
- 支持视频搜索和指定视频爬取
- 支持评论数据抓取
### 5. 微博
核心技术:
- 使用 Playwright 维持登录态
- 通过 JS 获取加密参数
- 支持微博搜索和指定微博爬取
- 支持评论数据抓取
### 6. 贴吧
核心技术:
- Cookie 登录方式
- 通过解析 HTML 获取数据
- 支持帖子搜索和指定帖子爬取
- 支持评论数据抓取
### 7. 知乎
核心技术:
- 使用 Playwright 维持登录态
- 通过 JS 获取 x-zse-96 等参数
- 支持回答搜索和指定回答爬取
- 支持评论数据抓取
## 项目亮点
1. 使用抽象类设计,代码结构清晰
2. 支持 IP 代理池
3. 支持多账号登录
4. 支持生成评论词云图
5. 支持多种数据存储方式
6. 使用 Playwright 降低逆向难度
这个项目通过巧妙使用 Playwright 来获取加密参数,大大降低了爬虫的开发难度,是一个非常实用的自媒体平台数据采集工具。
#63f85aee0000000013015174

BIN
project.pdf Normal file

Binary file not shown.

View File

@ -1,18 +1,5 @@
httpx==0.24.0
Pillow==9.5.0
playwright==1.42.0
tenacity==8.2.2
opencv-python
aiomysql==0.2.0
redis~=4.6.0
pydantic==2.5.2
aiofiles~=23.2.1
fastapi==0.110.2
uvicorn==0.29.0
python-dotenv==1.0.1
jieba==0.42.1
wordcloud==1.9.3
matplotlib==3.9.0
requests==2.32.3
parsel==1.9.1
pyexecjs==1.5.1
aiohttp
aiofiles
mysql-connector-python
PyQt6
pyinstaller

1121
xhs_crawler_gui.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,116 @@
# 向量化小红书笔记流程及注意事项
## 正向过程(笔记内容向量化)
```mermaid
flowchart TD
A[开始] --> B[连接MySQL数据库]
B --> C[查询xhs_notes表<br>type='normal'的笔记]
C --> D[初始化FAISS向量存储]
subgraph 笔记处理循环
E[获取单条笔记数据<br>note_id, title, description] --> F[合并标题和描述]
F --> G[文本分割<br>chunk_size=120<br>overlap=20]
G --> H[创建新的向量存储new_vs]
end
subgraph FAISS向量存储内部结构
H --> I1[FAISS索引<br>存储向量数据]
H --> I2[DocStore内部字典<br>key: vector_id自动生成<br>value: Document对象]
end
subgraph 向量存储处理
I2 --> J[遍历DocStore字典<br>获取vector_id和文档内容]
J --> K[生成content_hash]
K --> L{检查content_hash<br>是否存在}
L -->|不存在| M[存入vector_store表<br>关联字段:<br>note_id<br>vector_id<br>content<br>content_hash]
L -->|存在| N[跳过]
M --> O[合并到主向量存储]
N --> O
end
O --> P[保存最终向量存储<br>到本地文件]
P --> Q[结束]
subgraph 数据关系说明
R[xhs_notes表] -.->|note_id关联| S[vector_store表]
S -.->|vector_id关联| T[FAISS DocStore]
T -.->|自动生成| U[UUID格式的vector_id]
end
```
1. vector_id 的生成过程:
- 当使用 FAISS.from_texts() 创建新的向量存储时FAISS 会自动为每个文本片段生成一个 UUID 格式的 vector_id
这个 vector_id 存储在 FAISS 的 DocStore 内部字典中作为 key
代码中通过 new_vs.docstore._dict.items() 可以获取这些 vector_id
2. 数据结构关系:
```json
new_vs.docstore._dict = {
"vector_id_1": Document(page_content="文本内容1"),
"vector_id_2": Document(page_content="文本内容2"),
...
}
```
3. 存储过程:
- FAISS 索引存储实际的向量数据
- DocStore 存储 vector_id 和文本内容的映射
MySQL 中的 vector_store 表存储 vector_id、note_id 和文本内容的关联关系
这样,通过 vector_id 就可以:
- 在 FAISS 中找到对应的向量
- 在 DocStore 中找到原始文本
- 在数据库中找到对应的笔记 ID
## 反向过程(通过用户输入的文本,检索出对应的笔记)
```mermaid
flowchart TD
A[开始] --> B[加载FAISS向量存储]
B --> C[向量化用户查询]
C --> D[执行相似度搜索<br>k=10条结果]
subgraph 向量搜索结果处理
D --> E[遍历搜索结果<br>doc, score]
E --> F[计算文本内容的<br>content_hash]
F --> G[查询vector_store表<br>通过content_hash<br>获取note_id]
end
subgraph 笔记数据获取
G --> H{note_id是否<br>已处理?}
H -->|否| I[查询xhs_notes表<br>获取笔记详情]
H -->|是| J[跳过]
I --> K[获取清洗后的<br>笔记内容]
K --> L[构建完整笔记数据]
end
subgraph 数据整合
L --> M[收集上下文内容<br>用于GPT]
L --> N[收集完整笔记数据<br>用于前端展示]
end
M --> O[调用GPT生成回答]
N --> P[返回GPT回答和笔记数据]
P --> Q[结束]
subgraph 数据结构关系
R[FAISS向量库] -.->|相似度搜索| S[文本内容]
S -.->|content_hash| T[vector_store表]
T -.->|note_id| U[xhs_notes表]
end
subgraph 笔记数据结构
V[完整笔记数据包含:]
V --> V1[note_id]
V --> V2[标题]
V --> V3[描述]
V --> V4[收藏数]
V --> V5[评论数]
V --> V6[分享数]
V --> V7[清洗后内容]
end
```