# 项目文档 本项目是一个基于小红书笔记数据的AI助手系统,包含数据采集、处理、向量化存储和检索等功能。 ## 系统架构 ```mermaid graph TD A[用户查询] --> B[AI助手] B --> C[向量检索] C --> D[数据库] D --> E[笔记数据] B --> F[GPT分析] E --> F F --> G[结果展示] ``` ## 核心脚本说明 ### 主要应用脚本 #### assistant.py - **功能**: 主要的AI助手应用程序 - **核心特性**: - OpenAI API 集成 - 向量检索功能 - Gradio UI界面 - 数据库交互 - **关键代码**: ```python async def process_query(query): conn = None try: logger.info(f"开始处理查询: {query}") vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True) results = vs.similarity_search_with_score(query, k=10) conn = await get_db_connection() logger.info("数据库连接成功") notes_data = [] seen_note_ids = set() for doc, score in results: content_hash = get_content_hash(doc.page_content) async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute("SELECT note_id FROM vector_store WHERE content_hash = %s", (content_hash,)) result = await cursor.fetchone() if not result or result['note_id'] in seen_note_ids: continue note_id = result['note_id'] seen_note_ids.add(note_id) note = await get_note_details(conn, note_id) clean_content = await get_clean_note_content(conn, note_id) note_data = { 'id': note_id, 'title': note.get('title', '无标题'), 'description': note.get('description', '暂无描述'), 'clean_content': clean_content } notes_data.append(note_data) return notes_data finally: if conn is not None: await conn.close() ``` - **处理流程**: ``` mermaid sequenceDiagram participant User participant Assistant participant VectorDB participant GPT participant MySQL User->>Assistant: 输入查询 Assistant->>VectorDB: 向量检索 VectorDB->>MySQL: 获取笔记 MySQL-->>Assistant: 返回笔记数据 Assistant->>GPT: 生成回答 GPT-->>User: 展示结果 ``` #### ai_assistant.py - **功能**: AI助手的阿里云实现版本 - **核心特性**: - 支持阿里云 DashScope API - 笔记内容分析和展示 - Markdown 格式化输出 - **关键代码**: ```python async def process_query(query): """处理用户查询""" conn = None try: logger.info(f"开始处理查询: {query}") # 加载向量存储 vs_path = "./raw_vs" logger.info(f"正在检查向量存储路径: {vs_path}") if not os.path.exists(vs_path): logger.error(f"向量存储路径不存在: {vs_path}") return "错误:向量存储不存在", [] logger.info("正在加载向量存储...") vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True) # 搜索相关向量 logger.info("开始向量搜索...") results = vs.similarity_search_with_score(query, k=10) logger.info(f"找到 {len(results)} 条相关结果") try: conn = await get_db_connection() if not conn: logger.error("数据库连接失败") return "数据库连接失败", [] logger.info("数据库连接成功") context_notes = [] notes_data = [] # 存储完整的笔记数据 # 获取相似度分数大于0.5的前5条笔记 seen_note_ids = set() for doc, similarity_score in results: # MAX_INNER_PRODUCT 策略下,分数越大表示越相似 # 如果相似度分数小于0.5,跳过该结果 if similarity_score < 0.5: continue try: content_hash = get_content_hash(doc.page_content) logger.info(f"处理content_hash: {content_hash}, 相似度分数: {similarity_score}") async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute(""" SELECT note_id FROM vector_store WHERE content_hash = %s """, (content_hash,)) result = await cursor.fetchone() if not result or result['note_id'] in seen_note_ids: continue note_id = result['note_id'] seen_note_ids.add(note_id) # 获取笔记详情和清洗内容 note = await get_note_details(conn, note_id) clean_content = await get_clean_note_content(conn, note_id) if not note or not clean_content: continue # 构建完整的笔记数据 note_data = { 'id': note_id, 'title': note.get('title', '无标题'), 'description': note.get('description', '暂无描述'), 'collected_count': note.get('collected_count', 0), 'comment_count': note.get('comment_count', 0), 'share_count': note.get('share_count', 0), 'clean_content': clean_content } notes_data.append(note_data) context_notes.append(f"标题:{note['title']}\n内容:{note['description']}") if len(notes_data) >= 5: # 仍然限制最多5条 break except Exception as e: logger.error(f"处理笔记时出错: {str(e)}") continue # 即使没有找到符合条件的笔记也继续执行 logger.info(f"找到 {len(notes_data)} 条符合相似度要求的笔记") if not notes_data: logger.warning("未获取到任何有效的笔记内容") # return "未找到相关笔记内容", [] # 准备GPT提示 logger.info("准备调用GPT...") system_prompt = """你是一位专业的化妆行业教师,专门帮助产品经理理解和分析小红书笔记。你的回答分下面3种情况: 1、如果用户的输入与化妆品无关,不要参考相关笔记内容,正常与客户交流。 2、如果用户的输入与化妆品相关,而且找到相关笔记内容,参考相关笔记内容,给出回答。 3、如果用户的输入与化妆品相关,但是没有找到相关笔记内容,请结合上下文和历史记录,给出回答。 回答要突出重点,并结合化妆品行业的专业知识。\n""" + f"相关笔记内容:\n" + "\n\n".join(context_notes) user_prompt = f"{query}" logger.info("调用GPT获取回答...") gpt_response = await chat_with_gpt(system_prompt, user_prompt) return gpt_response, notes_data except Exception as e: logger.error(f"处理数据时出错: {str(e)}") return f"处理数据时出错: {str(e)}", [] finally: if conn is not None: try: await conn.close() logger.info("数据库连接已关闭") except Exception as e: logger.error(f"关闭数据库连接时出错: {str(e)}") ``` - **处理流程**: ```mermaid graph TD A[用户查询] --> B[AI助手] B --> C[向量检索] C --> D[数据库] D --> E[笔记数据] B --> F[GPT分析] E --> F F --> G[结果展示] ``` ### 数据处理脚本 #### process_raw_notes.py - **功能**: 处理原始笔记数据 - **主要功能**: - 文本分割 - 向量化存储 - 数据库存储 - **关键代码**: ```python async def process_xhs_notes(): """处理小红书笔记数据""" conn = None try: logger.info("正在尝试连接数据库...") # 数据库连接配置 conn = await aiomysql.connect( host='183.11.229.79', port=3316, user='root', password='zaq12wsx@9Xin', db='9Xin', autocommit=True ) logger.info("数据库连接成功") if conn is None: raise Exception("数据库连接失败") async with conn.cursor(aiomysql.DictCursor) as cursor: # 获取所有type为'normal'的笔记 await cursor.execute(""" SELECT id, note_id, title, description FROM xhs_notes WHERE type = 'normal' """) notes = await cursor.fetchall() # 使用text_splitter处理每条笔记 text_splitter = RecursiveCharacterTextSplitter( chunk_size=120, chunk_overlap=20, length_function=len, is_separator_regex=False, ) vs_path = "./raw_vs" if not os.path.exists(vs_path): os.makedirs(vs_path) # 初始化向量存储 if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')): vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True) else: vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT) # 使用tqdm显示处理进度 for note in tqdm(notes, desc="处理小红书笔记"): # 合标题和描述 content = f"{note['title']}\n{note['description']}" # 分割文本 texts = text_splitter.split_text(content) if texts: # 创建新的向量存储 new_vs = FAISS.from_texts(texts, embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT) # 将向量数据保存到数据库 async with conn.cursor() as insert_cursor: for vector_id, document in new_vs.docstore._dict.items(): content = document.page_content content_hash = get_content_hash(content) # 检查是否已存在相同的content_hash await insert_cursor.execute(""" SELECT id FROM vector_store WHERE content_hash = %s """, (content_hash,)) if not await insert_cursor.fetchone(): # 插入新的向量数据 await insert_cursor.execute(""" INSERT INTO vector_store (note_id, vector_id, content, content_hash) VALUES (%s, %s, %s, %s) """, (note['id'], vector_id, content, content_hash)) # 合并向量存储 vs.merge_from(new_vs) # 保存最终的向量存储 vs.save_local(vs_path) except aiomysql.Error as e: logger.error(f"MySQL错误: {e}") raise except Exception as e: logger.error(f"处理小红书笔记时出错: {e}") raise finally: # 安全关闭连接 try: if conn is not None: logger.info("正在关闭数据库连接...") await conn.close() logger.info("数据库连接已关闭") except Exception as e: logger.error(f"关闭数据库连接时出错: {e}") ``` - **处理流程**: ```mermaid flowchart TD A[读取原始笔记] --> B[文本分割] B --> C[向量化] C --> D[存储向量] D --> E[更新数据库] ``` #### process_clean_notes.py - **功能**: 清洗和处理笔记数据 - **主要功能**: - GPT内容分析 - 思维导图生成 - 内容结构化存储 - **关键代码**: ```python async def process_clean_notes(): """处理笔记并生成清洗后的向量存储""" conn = None try: logger.info("正在尝试连接数据库...") # 数据库连接配置 conn = await aiomysql.connect( host='183.11.229.79', port=3316, user='root', password='zaq12wsx@9Xin', db='9Xin', autocommit=True ) logger.info("数据库连接成功") if conn is None: raise Exception("数据库连接失败") async with conn.cursor(aiomysql.DictCursor) as cursor: # 获取所有normal类型的笔记 await cursor.execute(""" SELECT id, note_id, title, description FROM xhs_notes WHERE type = 'normal' """) notes = await cursor.fetchall() # 初始化向量存储 vs_path = "./clean_faiss" if not os.path.exists(vs_path): os.makedirs(vs_path) if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')): vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True) else: vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT) # 处理每条笔记 for note in tqdm(notes, desc="处理笔记"): content = f"{note['title']}\n{note['description']}" # 获取GPT分析结果 gpt_response = await interact_with_chatgpt(content) if not gpt_response: continue # 处理每种类型的内容 for content_type, content in gpt_response.items(): # 创建新的向量存储 new_vs = FAISS.from_texts([content], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT) # 保存到数据库 async with conn.cursor() as insert_cursor: for vector_id, document in new_vs.docstore._dict.items(): content = document.page_content content_hash = get_content_hash(content) # 检查是否已存在 await insert_cursor.execute(""" SELECT id FROM clean_note_store WHERE content_hash = %s AND content_type = %s """, (content_hash, content_type)) if not await insert_cursor.fetchone(): await insert_cursor.execute(""" INSERT INTO clean_note_store (note_id, vector_id, content_type, content, content_hash) VALUES (%s, %s, %s, %s, %s) """, (note['id'], vector_id, content_type, content, content_hash)) # 合并向量存储 vs.merge_from(new_vs) # 保存最终的向量存储 vs.save_local(vs_path) except aiomysql.Error as e: logger.error(f"MySQL错误: {e}") raise except Exception as e: logger.error(f"处理笔记时出错: {e}") raise finally: # 安全关闭连接 try: if conn is not None: logger.info("正在关闭数据库连接...") await conn.close() logger.info("数据库连接已关闭") except Exception as e: logger.error(f"关闭数据库连接时出错: {e}") ``` - **处理流程**: ```mermaid flowchart TD A[读取笔记] --> B[GPT分析] B --> C[生成导读] B --> D[生成思维导图] B --> E[生成摘要] C & D & E --> F[存储结果] ``` ### 数据库相关脚本 #### 数据库结构 ```mermaid erDiagram xhs_notes ||--o{ clean_note_store : contains xhs_notes ||--o{ vector_store : has xhs_notes { int id PK string note_id string title string description int user_id string nickname } clean_note_store { int id PK int note_id FK string content_type text content string vector_id } vector_store { int id PK int note_id FK string vector_id text content string content_hash } ``` #### src/mcp/server.py - **功能**: MCP服务端 - **主要功能**: - 消息队列管理 - 任务调度分发 - 资源监控 - 服务注册发现 ### 工具脚本 #### check_downloads.py - **功能**: 检查媒体文件下载状态 - **主要功能**: - 文件完整性检查 - 断点续传 - 数据库状态更新 #### import_xhs_notes.py - **功能**: 导入小红书笔记数据 - **主要功能**: - JSON 数据解析 - 数据库导入 - 错误处理 #### mermaid.py - **功能**: Mermaid 图表生成工具 - **主要功能**: - 图表渲染 - Gradio UI 集成 - HTML 嵌入支持 ### 测试脚本 #### test_qw.py - **功能**: 通义千问 API 测试 - **主要功能**: - API 连接测试 - 模型响应测试 #### src/test_db_client.py - **功能**: MCP客户端测试 - **主要功能**: - 连接测试 - 查询测试 - 资源列表测试 ## 配置文件 #### config.py - **功能**: 全局配置文件 - **主要配置**: - API密钥 - 模型路径 - 服务器URL ## 使用说明 1. 首先确保所有依赖已安装: ```bash pip install -r requirements.txt ``` 2. 配置数据库连接信息和API密钥 3. 运行数据处理脚本进行初始化: ```bash python process_raw_notes.py python process_clean_notes.py ``` 4. 启动AI助手应用: ```bash python assistant.py ``` ## 注意事项 - 确保数据库服务正常运行 - 检查API密钥配置 - 确保模型文件路径正确 - 注意数据安全和隐私保护