565 lines
19 KiB
Markdown
565 lines
19 KiB
Markdown
# 项目文档
|
||
|
||
本项目是一个基于小红书笔记数据的AI助手系统,包含数据采集、处理、向量化存储和检索等功能。
|
||
|
||
## 系统架构
|
||
```mermaid
|
||
graph TD
|
||
A[用户查询] --> B[AI助手]
|
||
B --> C[向量检索]
|
||
C --> D[数据库]
|
||
D --> E[笔记数据]
|
||
B --> F[GPT分析]
|
||
E --> F
|
||
F --> G[结果展示]
|
||
```
|
||
## 核心脚本说明
|
||
|
||
### 主要应用脚本
|
||
|
||
#### assistant.py
|
||
- **功能**: 主要的AI助手应用程序
|
||
- **核心特性**:
|
||
- OpenAI API 集成
|
||
- 向量检索功能
|
||
- Gradio UI界面
|
||
- 数据库交互
|
||
- **关键代码**:
|
||
```python
|
||
async def process_query(query):
|
||
conn = None
|
||
try:
|
||
logger.info(f"开始处理查询: {query}")
|
||
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
|
||
results = vs.similarity_search_with_score(query, k=10)
|
||
|
||
conn = await get_db_connection()
|
||
logger.info("数据库连接成功")
|
||
|
||
notes_data = []
|
||
seen_note_ids = set()
|
||
for doc, score in results:
|
||
content_hash = get_content_hash(doc.page_content)
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
await cursor.execute("SELECT note_id FROM vector_store WHERE content_hash = %s", (content_hash,))
|
||
result = await cursor.fetchone()
|
||
|
||
if not result or result['note_id'] in seen_note_ids:
|
||
continue
|
||
|
||
note_id = result['note_id']
|
||
seen_note_ids.add(note_id)
|
||
note = await get_note_details(conn, note_id)
|
||
clean_content = await get_clean_note_content(conn, note_id)
|
||
|
||
note_data = {
|
||
'id': note_id,
|
||
'title': note.get('title', '无标题'),
|
||
'description': note.get('description', '暂无描述'),
|
||
'clean_content': clean_content
|
||
}
|
||
notes_data.append(note_data)
|
||
|
||
return notes_data
|
||
finally:
|
||
if conn is not None:
|
||
await conn.close()
|
||
```
|
||
- **处理流程**:
|
||
|
||
``` mermaid
|
||
sequenceDiagram
|
||
participant User
|
||
participant Assistant
|
||
participant VectorDB
|
||
participant GPT
|
||
participant MySQL
|
||
User->>Assistant: 输入查询
|
||
Assistant->>VectorDB: 向量检索
|
||
VectorDB->>MySQL: 获取笔记
|
||
MySQL-->>Assistant: 返回笔记数据
|
||
Assistant->>GPT: 生成回答
|
||
GPT-->>User: 展示结果
|
||
```
|
||
|
||
#### ai_assistant.py
|
||
- **功能**: AI助手的阿里云实现版本
|
||
- **核心特性**:
|
||
- 支持阿里云 DashScope API
|
||
- 笔记内容分析和展示
|
||
- Markdown 格式化输出
|
||
- **关键代码**:
|
||
```python
|
||
async def process_query(query):
|
||
"""处理用户查询"""
|
||
conn = None
|
||
try:
|
||
logger.info(f"开始处理查询: {query}")
|
||
|
||
# 加载向量存储
|
||
vs_path = "./raw_vs"
|
||
logger.info(f"正在检查向量存储路径: {vs_path}")
|
||
if not os.path.exists(vs_path):
|
||
logger.error(f"向量存储路径不存在: {vs_path}")
|
||
return "错误:向量存储不存在", []
|
||
|
||
logger.info("正在加载向量存储...")
|
||
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
|
||
|
||
# 搜索相关向量
|
||
logger.info("开始向量搜索...")
|
||
results = vs.similarity_search_with_score(query, k=10)
|
||
logger.info(f"找到 {len(results)} 条相关结果")
|
||
|
||
try:
|
||
conn = await get_db_connection()
|
||
if not conn:
|
||
logger.error("数据库连接失败")
|
||
return "数据库连接失败", []
|
||
|
||
logger.info("数据库连接成功")
|
||
context_notes = []
|
||
notes_data = [] # 存储完整的笔记数据
|
||
|
||
# 获取相似度分数大于0.5的前5条笔记
|
||
seen_note_ids = set()
|
||
for doc, similarity_score in results:
|
||
# MAX_INNER_PRODUCT 策略下,分数越大表示越相似
|
||
# 如果相似度分数小于0.5,跳过该结果
|
||
if similarity_score < 0.5:
|
||
continue
|
||
|
||
try:
|
||
content_hash = get_content_hash(doc.page_content)
|
||
logger.info(f"处理content_hash: {content_hash}, 相似度分数: {similarity_score}")
|
||
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
await cursor.execute("""
|
||
SELECT note_id FROM vector_store
|
||
WHERE content_hash = %s
|
||
""", (content_hash,))
|
||
result = await cursor.fetchone()
|
||
|
||
if not result or result['note_id'] in seen_note_ids:
|
||
continue
|
||
|
||
note_id = result['note_id']
|
||
seen_note_ids.add(note_id)
|
||
|
||
# 获取笔记详情和清洗内容
|
||
note = await get_note_details(conn, note_id)
|
||
clean_content = await get_clean_note_content(conn, note_id)
|
||
|
||
if not note or not clean_content:
|
||
continue
|
||
|
||
# 构建完整的笔记数据
|
||
note_data = {
|
||
'id': note_id,
|
||
'title': note.get('title', '无标题'),
|
||
'description': note.get('description', '暂无描述'),
|
||
'collected_count': note.get('collected_count', 0),
|
||
'comment_count': note.get('comment_count', 0),
|
||
'share_count': note.get('share_count', 0),
|
||
'clean_content': clean_content
|
||
}
|
||
|
||
notes_data.append(note_data)
|
||
context_notes.append(f"标题:{note['title']}\n内容:{note['description']}")
|
||
|
||
if len(notes_data) >= 5: # 仍然限制最多5条
|
||
break
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理笔记时出错: {str(e)}")
|
||
continue
|
||
|
||
# 即使没有找到符合条件的笔记也继续执行
|
||
logger.info(f"找到 {len(notes_data)} 条符合相似度要求的笔记")
|
||
|
||
if not notes_data:
|
||
logger.warning("未获取到任何有效的笔记内容")
|
||
# return "未找到相关笔记内容", []
|
||
|
||
# 准备GPT提示
|
||
logger.info("准备调用GPT...")
|
||
system_prompt = """你是一位专业的化妆行业教师,专门帮助产品经理理解和分析小红书笔记。你的回答分下面3种情况:
|
||
1、如果用户的输入与化妆品无关,不要参考相关笔记内容,正常与客户交流。
|
||
2、如果用户的输入与化妆品相关,而且找到相关笔记内容,参考相关笔记内容,给出回答。
|
||
3、如果用户的输入与化妆品相关,但是没有找到相关笔记内容,请结合上下文和历史记录,给出回答。
|
||
回答要突出重点,并结合化妆品行业的专业知识。\n""" + f"相关笔记内容:\n" + "\n\n".join(context_notes)
|
||
user_prompt = f"{query}"
|
||
logger.info("调用GPT获取回答...")
|
||
gpt_response = await chat_with_gpt(system_prompt, user_prompt)
|
||
|
||
return gpt_response, notes_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理数据时出错: {str(e)}")
|
||
return f"处理数据时出错: {str(e)}", []
|
||
|
||
finally:
|
||
if conn is not None:
|
||
try:
|
||
await conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭数据库连接时出错: {str(e)}")
|
||
|
||
```
|
||
|
||
- **处理流程**:
|
||
```mermaid
|
||
graph TD
|
||
A[用户查询] --> B[AI助手]
|
||
B --> C[向量检索]
|
||
C --> D[数据库]
|
||
D --> E[笔记数据]
|
||
B --> F[GPT分析]
|
||
E --> F
|
||
F --> G[结果展示]
|
||
```
|
||
### 数据处理脚本
|
||
|
||
#### process_raw_notes.py
|
||
- **功能**: 处理原始笔记数据
|
||
- **主要功能**:
|
||
- 文本分割
|
||
- 向量化存储
|
||
- 数据库存储
|
||
- **关键代码**:
|
||
```python
|
||
async def process_xhs_notes():
|
||
"""处理小红书笔记数据"""
|
||
conn = None
|
||
try:
|
||
logger.info("正在尝试连接数据库...")
|
||
# 数据库连接配置
|
||
conn = await aiomysql.connect(
|
||
host='183.11.229.79',
|
||
port=3316,
|
||
user='root',
|
||
password='zaq12wsx@9Xin',
|
||
db='9Xin',
|
||
autocommit=True
|
||
)
|
||
logger.info("数据库连接成功")
|
||
|
||
if conn is None:
|
||
raise Exception("数据库连接失败")
|
||
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# 获取所有type为'normal'的笔记
|
||
await cursor.execute("""
|
||
SELECT id, note_id, title, description
|
||
FROM xhs_notes
|
||
WHERE type = 'normal'
|
||
""")
|
||
notes = await cursor.fetchall()
|
||
|
||
# 使用text_splitter处理每条笔记
|
||
text_splitter = RecursiveCharacterTextSplitter(
|
||
chunk_size=120,
|
||
chunk_overlap=20,
|
||
length_function=len,
|
||
is_separator_regex=False,
|
||
)
|
||
|
||
vs_path = "./raw_vs"
|
||
if not os.path.exists(vs_path):
|
||
os.makedirs(vs_path)
|
||
|
||
# 初始化向量存储
|
||
if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
|
||
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
|
||
else:
|
||
vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
|
||
|
||
# 使用tqdm显示处理进度
|
||
for note in tqdm(notes, desc="处理小红书笔记"):
|
||
# 合标题和描述
|
||
content = f"{note['title']}\n{note['description']}"
|
||
|
||
# 分割文本
|
||
texts = text_splitter.split_text(content)
|
||
|
||
if texts:
|
||
# 创建新的向量存储
|
||
new_vs = FAISS.from_texts(texts, embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
|
||
|
||
# 将向量数据保存到数据库
|
||
async with conn.cursor() as insert_cursor:
|
||
for vector_id, document in new_vs.docstore._dict.items():
|
||
content = document.page_content
|
||
content_hash = get_content_hash(content)
|
||
|
||
# 检查是否已存在相同的content_hash
|
||
await insert_cursor.execute("""
|
||
SELECT id FROM vector_store
|
||
WHERE content_hash = %s
|
||
""", (content_hash,))
|
||
|
||
if not await insert_cursor.fetchone():
|
||
# 插入新的向量数据
|
||
await insert_cursor.execute("""
|
||
INSERT INTO vector_store
|
||
(note_id, vector_id, content, content_hash)
|
||
VALUES (%s, %s, %s, %s)
|
||
""", (note['id'], vector_id, content, content_hash))
|
||
|
||
# 合并向量存储
|
||
vs.merge_from(new_vs)
|
||
|
||
# 保存最终的向量存储
|
||
vs.save_local(vs_path)
|
||
|
||
except aiomysql.Error as e:
|
||
logger.error(f"MySQL错误: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"处理小红书笔记时出错: {e}")
|
||
raise
|
||
finally:
|
||
# 安全关闭连接
|
||
try:
|
||
if conn is not None:
|
||
logger.info("正在关闭数据库连接...")
|
||
await conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭数据库连接时出错: {e}")
|
||
```
|
||
- **处理流程**:
|
||
```mermaid
|
||
flowchart TD
|
||
A[读取原始笔记] --> B[文本分割]
|
||
B --> C[向量化]
|
||
C --> D[存储向量]
|
||
D --> E[更新数据库]
|
||
```
|
||
|
||
#### process_clean_notes.py
|
||
- **功能**: 清洗和处理笔记数据
|
||
- **主要功能**:
|
||
- GPT内容分析
|
||
- 思维导图生成
|
||
- 内容结构化存储
|
||
- **关键代码**:
|
||
```python
|
||
async def process_clean_notes():
|
||
"""处理笔记并生成清洗后的向量存储"""
|
||
conn = None
|
||
try:
|
||
logger.info("正在尝试连接数据库...")
|
||
# 数据库连接配置
|
||
conn = await aiomysql.connect(
|
||
host='183.11.229.79',
|
||
port=3316,
|
||
user='root',
|
||
password='zaq12wsx@9Xin',
|
||
db='9Xin',
|
||
autocommit=True
|
||
)
|
||
logger.info("数据库连接成功")
|
||
|
||
if conn is None:
|
||
raise Exception("数据库连接失败")
|
||
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# 获取所有normal类型的笔记
|
||
await cursor.execute("""
|
||
SELECT id, note_id, title, description
|
||
FROM xhs_notes
|
||
WHERE type = 'normal'
|
||
""")
|
||
notes = await cursor.fetchall()
|
||
|
||
# 初始化向量存储
|
||
vs_path = "./clean_faiss"
|
||
if not os.path.exists(vs_path):
|
||
os.makedirs(vs_path)
|
||
|
||
if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
|
||
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
|
||
else:
|
||
vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
|
||
|
||
# 处理每条笔记
|
||
for note in tqdm(notes, desc="处理笔记"):
|
||
content = f"{note['title']}\n{note['description']}"
|
||
|
||
# 获取GPT分析结果
|
||
gpt_response = await interact_with_chatgpt(content)
|
||
if not gpt_response:
|
||
continue
|
||
|
||
# 处理每种类型的内容
|
||
for content_type, content in gpt_response.items():
|
||
# 创建新的向量存储
|
||
new_vs = FAISS.from_texts([content], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
|
||
|
||
# 保存到数据库
|
||
async with conn.cursor() as insert_cursor:
|
||
for vector_id, document in new_vs.docstore._dict.items():
|
||
content = document.page_content
|
||
content_hash = get_content_hash(content)
|
||
|
||
# 检查是否已存在
|
||
await insert_cursor.execute("""
|
||
SELECT id FROM clean_note_store
|
||
WHERE content_hash = %s AND content_type = %s
|
||
""", (content_hash, content_type))
|
||
|
||
if not await insert_cursor.fetchone():
|
||
await insert_cursor.execute("""
|
||
INSERT INTO clean_note_store
|
||
(note_id, vector_id, content_type, content, content_hash)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
""", (note['id'], vector_id, content_type, content, content_hash))
|
||
|
||
# 合并向量存储
|
||
vs.merge_from(new_vs)
|
||
|
||
# 保存最终的向量存储
|
||
vs.save_local(vs_path)
|
||
|
||
except aiomysql.Error as e:
|
||
logger.error(f"MySQL错误: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"处理笔记时出错: {e}")
|
||
raise
|
||
finally:
|
||
# 安全关闭连接
|
||
try:
|
||
if conn is not None:
|
||
logger.info("正在关闭数据库连接...")
|
||
await conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭数据库连接时出错: {e}")
|
||
```
|
||
|
||
- **处理流程**:
|
||
```mermaid
|
||
flowchart TD
|
||
A[读取笔记] --> B[GPT分析]
|
||
B --> C[生成导读]
|
||
B --> D[生成思维导图]
|
||
B --> E[生成摘要]
|
||
C & D & E --> F[存储结果]
|
||
```
|
||
|
||
### 数据库相关脚本
|
||
|
||
#### 数据库结构
|
||
|
||
```mermaid
|
||
erDiagram
|
||
xhs_notes ||--o{ clean_note_store : contains
|
||
xhs_notes ||--o{ vector_store : has
|
||
xhs_notes {
|
||
int id PK
|
||
string note_id
|
||
string title
|
||
string description
|
||
int user_id
|
||
string nickname
|
||
}
|
||
clean_note_store {
|
||
int id PK
|
||
int note_id FK
|
||
string content_type
|
||
text content
|
||
string vector_id
|
||
}
|
||
vector_store {
|
||
int id PK
|
||
int note_id FK
|
||
string vector_id
|
||
text content
|
||
string content_hash
|
||
}
|
||
```
|
||
|
||
#### src/mcp/server.py
|
||
- **功能**: MCP服务端
|
||
- **主要功能**:
|
||
- 消息队列管理
|
||
- 任务调度分发
|
||
- 资源监控
|
||
- 服务注册发现
|
||
|
||
### 工具脚本
|
||
|
||
#### check_downloads.py
|
||
- **功能**: 检查媒体文件下载状态
|
||
- **主要功能**:
|
||
- 文件完整性检查
|
||
- 断点续传
|
||
- 数据库状态更新
|
||
|
||
#### import_xhs_notes.py
|
||
- **功能**: 导入小红书笔记数据
|
||
- **主要功能**:
|
||
- JSON 数据解析
|
||
- 数据库导入
|
||
- 错误处理
|
||
|
||
#### mermaid.py
|
||
- **功能**: Mermaid 图表生成工具
|
||
- **主要功能**:
|
||
- 图表渲染
|
||
- Gradio UI 集成
|
||
- HTML 嵌入支持
|
||
|
||
### 测试脚本
|
||
|
||
#### test_qw.py
|
||
- **功能**: 通义千问 API 测试
|
||
- **主要功能**:
|
||
- API 连接测试
|
||
- 模型响应测试
|
||
|
||
#### src/test_db_client.py
|
||
- **功能**: MCP客户端测试
|
||
- **主要功能**:
|
||
- 连接测试
|
||
- 查询测试
|
||
- 资源列表测试
|
||
|
||
## 配置文件
|
||
|
||
#### config.py
|
||
- **功能**: 全局配置文件
|
||
- **主要配置**:
|
||
- API密钥
|
||
- 模型路径
|
||
- 服务器URL
|
||
|
||
## 使用说明
|
||
|
||
1. 首先确保所有依赖已安装:
|
||
```bash
|
||
pip install -r requirements.txt
|
||
```
|
||
|
||
2. 配置数据库连接信息和API密钥
|
||
|
||
3. 运行数据处理脚本进行初始化:
|
||
```bash
|
||
python process_raw_notes.py
|
||
python process_clean_notes.py
|
||
```
|
||
|
||
4. 启动AI助手应用:
|
||
```bash
|
||
python assistant.py
|
||
```
|
||
|
||
## 注意事项
|
||
|
||
- 确保数据库服务正常运行
|
||
- 检查API密钥配置
|
||
- 确保模型文件路径正确
|
||
- 注意数据安全和隐私保护 |