19 KiB
19 KiB
项目文档
本项目是一个基于小红书笔记数据的AI助手系统,包含数据采集、处理、向量化存储和检索等功能。
系统架构
graph TD
A[用户查询] --> B[AI助手]
B --> C[向量检索]
C --> D[数据库]
D --> E[笔记数据]
B --> F[GPT分析]
E --> F
F --> G[结果展示]
核心脚本说明
主要应用脚本
assistant.py
- 功能: 主要的AI助手应用程序
- 核心特性:
- OpenAI API 集成
- 向量检索功能
- Gradio UI界面
- 数据库交互
- 关键代码:
async def process_query(query):
conn = None
try:
logger.info(f"开始处理查询: {query}")
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
results = vs.similarity_search_with_score(query, k=10)
conn = await get_db_connection()
logger.info("数据库连接成功")
notes_data = []
seen_note_ids = set()
for doc, score in results:
content_hash = get_content_hash(doc.page_content)
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute("SELECT note_id FROM vector_store WHERE content_hash = %s", (content_hash,))
result = await cursor.fetchone()
if not result or result['note_id'] in seen_note_ids:
continue
note_id = result['note_id']
seen_note_ids.add(note_id)
note = await get_note_details(conn, note_id)
clean_content = await get_clean_note_content(conn, note_id)
note_data = {
'id': note_id,
'title': note.get('title', '无标题'),
'description': note.get('description', '暂无描述'),
'clean_content': clean_content
}
notes_data.append(note_data)
return notes_data
finally:
if conn is not None:
await conn.close()
- 处理流程:
sequenceDiagram
participant User
participant Assistant
participant VectorDB
participant GPT
participant MySQL
User->>Assistant: 输入查询
Assistant->>VectorDB: 向量检索
VectorDB->>MySQL: 获取笔记
MySQL-->>Assistant: 返回笔记数据
Assistant->>GPT: 生成回答
GPT-->>User: 展示结果
ai_assistant.py
- 功能: AI助手的阿里云实现版本
- 核心特性:
- 支持阿里云 DashScope API
- 笔记内容分析和展示
- Markdown 格式化输出
- 关键代码:
async def process_query(query):
"""处理用户查询"""
conn = None
try:
logger.info(f"开始处理查询: {query}")
# 加载向量存储
vs_path = "./raw_vs"
logger.info(f"正在检查向量存储路径: {vs_path}")
if not os.path.exists(vs_path):
logger.error(f"向量存储路径不存在: {vs_path}")
return "错误:向量存储不存在", []
logger.info("正在加载向量存储...")
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
# 搜索相关向量
logger.info("开始向量搜索...")
results = vs.similarity_search_with_score(query, k=10)
logger.info(f"找到 {len(results)} 条相关结果")
try:
conn = await get_db_connection()
if not conn:
logger.error("数据库连接失败")
return "数据库连接失败", []
logger.info("数据库连接成功")
context_notes = []
notes_data = [] # 存储完整的笔记数据
# 获取相似度分数大于0.5的前5条笔记
seen_note_ids = set()
for doc, similarity_score in results:
# MAX_INNER_PRODUCT 策略下,分数越大表示越相似
# 如果相似度分数小于0.5,跳过该结果
if similarity_score < 0.5:
continue
try:
content_hash = get_content_hash(doc.page_content)
logger.info(f"处理content_hash: {content_hash}, 相似度分数: {similarity_score}")
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute("""
SELECT note_id FROM vector_store
WHERE content_hash = %s
""", (content_hash,))
result = await cursor.fetchone()
if not result or result['note_id'] in seen_note_ids:
continue
note_id = result['note_id']
seen_note_ids.add(note_id)
# 获取笔记详情和清洗内容
note = await get_note_details(conn, note_id)
clean_content = await get_clean_note_content(conn, note_id)
if not note or not clean_content:
continue
# 构建完整的笔记数据
note_data = {
'id': note_id,
'title': note.get('title', '无标题'),
'description': note.get('description', '暂无描述'),
'collected_count': note.get('collected_count', 0),
'comment_count': note.get('comment_count', 0),
'share_count': note.get('share_count', 0),
'clean_content': clean_content
}
notes_data.append(note_data)
context_notes.append(f"标题:{note['title']}\n内容:{note['description']}")
if len(notes_data) >= 5: # 仍然限制最多5条
break
except Exception as e:
logger.error(f"处理笔记时出错: {str(e)}")
continue
# 即使没有找到符合条件的笔记也继续执行
logger.info(f"找到 {len(notes_data)} 条符合相似度要求的笔记")
if not notes_data:
logger.warning("未获取到任何有效的笔记内容")
# return "未找到相关笔记内容", []
# 准备GPT提示
logger.info("准备调用GPT...")
system_prompt = """你是一位专业的化妆行业教师,专门帮助产品经理理解和分析小红书笔记。你的回答分下面3种情况:
1、如果用户的输入与化妆品无关,不要参考相关笔记内容,正常与客户交流。
2、如果用户的输入与化妆品相关,而且找到相关笔记内容,参考相关笔记内容,给出回答。
3、如果用户的输入与化妆品相关,但是没有找到相关笔记内容,请结合上下文和历史记录,给出回答。
回答要突出重点,并结合化妆品行业的专业知识。\n""" + f"相关笔记内容:\n" + "\n\n".join(context_notes)
user_prompt = f"{query}"
logger.info("调用GPT获取回答...")
gpt_response = await chat_with_gpt(system_prompt, user_prompt)
return gpt_response, notes_data
except Exception as e:
logger.error(f"处理数据时出错: {str(e)}")
return f"处理数据时出错: {str(e)}", []
finally:
if conn is not None:
try:
await conn.close()
logger.info("数据库连接已关闭")
except Exception as e:
logger.error(f"关闭数据库连接时出错: {str(e)}")
- 处理流程:
graph TD
A[用户查询] --> B[AI助手]
B --> C[向量检索]
C --> D[数据库]
D --> E[笔记数据]
B --> F[GPT分析]
E --> F
F --> G[结果展示]
数据处理脚本
process_raw_notes.py
- 功能: 处理原始笔记数据
- 主要功能:
- 文本分割
- 向量化存储
- 数据库存储
- 关键代码:
async def process_xhs_notes():
"""处理小红书笔记数据"""
conn = None
try:
logger.info("正在尝试连接数据库...")
# 数据库连接配置
conn = await aiomysql.connect(
host='183.11.229.79',
port=3316,
user='root',
password='zaq12wsx@9Xin',
db='9Xin',
autocommit=True
)
logger.info("数据库连接成功")
if conn is None:
raise Exception("数据库连接失败")
async with conn.cursor(aiomysql.DictCursor) as cursor:
# 获取所有type为'normal'的笔记
await cursor.execute("""
SELECT id, note_id, title, description
FROM xhs_notes
WHERE type = 'normal'
""")
notes = await cursor.fetchall()
# 使用text_splitter处理每条笔记
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=120,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
vs_path = "./raw_vs"
if not os.path.exists(vs_path):
os.makedirs(vs_path)
# 初始化向量存储
if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
else:
vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
# 使用tqdm显示处理进度
for note in tqdm(notes, desc="处理小红书笔记"):
# 合标题和描述
content = f"{note['title']}\n{note['description']}"
# 分割文本
texts = text_splitter.split_text(content)
if texts:
# 创建新的向量存储
new_vs = FAISS.from_texts(texts, embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
# 将向量数据保存到数据库
async with conn.cursor() as insert_cursor:
for vector_id, document in new_vs.docstore._dict.items():
content = document.page_content
content_hash = get_content_hash(content)
# 检查是否已存在相同的content_hash
await insert_cursor.execute("""
SELECT id FROM vector_store
WHERE content_hash = %s
""", (content_hash,))
if not await insert_cursor.fetchone():
# 插入新的向量数据
await insert_cursor.execute("""
INSERT INTO vector_store
(note_id, vector_id, content, content_hash)
VALUES (%s, %s, %s, %s)
""", (note['id'], vector_id, content, content_hash))
# 合并向量存储
vs.merge_from(new_vs)
# 保存最终的向量存储
vs.save_local(vs_path)
except aiomysql.Error as e:
logger.error(f"MySQL错误: {e}")
raise
except Exception as e:
logger.error(f"处理小红书笔记时出错: {e}")
raise
finally:
# 安全关闭连接
try:
if conn is not None:
logger.info("正在关闭数据库连接...")
await conn.close()
logger.info("数据库连接已关闭")
except Exception as e:
logger.error(f"关闭数据库连接时出错: {e}")
- 处理流程:
flowchart TD
A[读取原始笔记] --> B[文本分割]
B --> C[向量化]
C --> D[存储向量]
D --> E[更新数据库]
process_clean_notes.py
- 功能: 清洗和处理笔记数据
- 主要功能:
- GPT内容分析
- 思维导图生成
- 内容结构化存储
- 关键代码:
async def process_clean_notes():
"""处理笔记并生成清洗后的向量存储"""
conn = None
try:
logger.info("正在尝试连接数据库...")
# 数据库连接配置
conn = await aiomysql.connect(
host='183.11.229.79',
port=3316,
user='root',
password='zaq12wsx@9Xin',
db='9Xin',
autocommit=True
)
logger.info("数据库连接成功")
if conn is None:
raise Exception("数据库连接失败")
async with conn.cursor(aiomysql.DictCursor) as cursor:
# 获取所有normal类型的笔记
await cursor.execute("""
SELECT id, note_id, title, description
FROM xhs_notes
WHERE type = 'normal'
""")
notes = await cursor.fetchall()
# 初始化向量存储
vs_path = "./clean_faiss"
if not os.path.exists(vs_path):
os.makedirs(vs_path)
if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
else:
vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
# 处理每条笔记
for note in tqdm(notes, desc="处理笔记"):
content = f"{note['title']}\n{note['description']}"
# 获取GPT分析结果
gpt_response = await interact_with_chatgpt(content)
if not gpt_response:
continue
# 处理每种类型的内容
for content_type, content in gpt_response.items():
# 创建新的向量存储
new_vs = FAISS.from_texts([content], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
# 保存到数据库
async with conn.cursor() as insert_cursor:
for vector_id, document in new_vs.docstore._dict.items():
content = document.page_content
content_hash = get_content_hash(content)
# 检查是否已存在
await insert_cursor.execute("""
SELECT id FROM clean_note_store
WHERE content_hash = %s AND content_type = %s
""", (content_hash, content_type))
if not await insert_cursor.fetchone():
await insert_cursor.execute("""
INSERT INTO clean_note_store
(note_id, vector_id, content_type, content, content_hash)
VALUES (%s, %s, %s, %s, %s)
""", (note['id'], vector_id, content_type, content, content_hash))
# 合并向量存储
vs.merge_from(new_vs)
# 保存最终的向量存储
vs.save_local(vs_path)
except aiomysql.Error as e:
logger.error(f"MySQL错误: {e}")
raise
except Exception as e:
logger.error(f"处理笔记时出错: {e}")
raise
finally:
# 安全关闭连接
try:
if conn is not None:
logger.info("正在关闭数据库连接...")
await conn.close()
logger.info("数据库连接已关闭")
except Exception as e:
logger.error(f"关闭数据库连接时出错: {e}")
- 处理流程:
flowchart TD
A[读取笔记] --> B[GPT分析]
B --> C[生成导读]
B --> D[生成思维导图]
B --> E[生成摘要]
C & D & E --> F[存储结果]
数据库相关脚本
数据库结构
erDiagram
xhs_notes ||--o{ clean_note_store : contains
xhs_notes ||--o{ vector_store : has
xhs_notes {
int id PK
string note_id
string title
string description
int user_id
string nickname
}
clean_note_store {
int id PK
int note_id FK
string content_type
text content
string vector_id
}
vector_store {
int id PK
int note_id FK
string vector_id
text content
string content_hash
}
src/mcp/server.py
- 功能: MCP服务端
- 主要功能:
- 消息队列管理
- 任务调度分发
- 资源监控
- 服务注册发现
工具脚本
check_downloads.py
- 功能: 检查媒体文件下载状态
- 主要功能:
- 文件完整性检查
- 断点续传
- 数据库状态更新
import_xhs_notes.py
- 功能: 导入小红书笔记数据
- 主要功能:
- JSON 数据解析
- 数据库导入
- 错误处理
mermaid.py
- 功能: Mermaid 图表生成工具
- 主要功能:
- 图表渲染
- Gradio UI 集成
- HTML 嵌入支持
测试脚本
test_qw.py
- 功能: 通义千问 API 测试
- 主要功能:
- API 连接测试
- 模型响应测试
src/test_db_client.py
- 功能: MCP客户端测试
- 主要功能:
- 连接测试
- 查询测试
- 资源列表测试
配置文件
config.py
- 功能: 全局配置文件
- 主要配置:
- API密钥
- 模型路径
- 服务器URL
使用说明
- 首先确保所有依赖已安装:
pip install -r requirements.txt
-
配置数据库连接信息和API密钥
-
运行数据处理脚本进行初始化:
python process_raw_notes.py
python process_clean_notes.py
- 启动AI助手应用:
python assistant.py
注意事项
- 确保数据库服务正常运行
- 检查API密钥配置
- 确保模型文件路径正确
- 注意数据安全和隐私保护