xhs_server/README.md
2024-12-16 10:31:07 +08:00

19 KiB
Raw Permalink Blame History

项目文档

本项目是一个基于小红书笔记数据的AI助手系统,包含数据采集、处理、向量化存储和检索等功能。

系统架构

graph TD
A[用户查询] --> B[AI助手]
B --> C[向量检索]
C --> D[数据库]
D --> E[笔记数据]
B --> F[GPT分析]
E --> F
F --> G[结果展示]

核心脚本说明

主要应用脚本

assistant.py

  • 功能: 主要的AI助手应用程序
  • 核心特性:
    • OpenAI API 集成
    • 向量检索功能
    • Gradio UI界面
    • 数据库交互
  • 关键代码:
async def process_query(query):
    conn = None
    try:
        logger.info(f"开始处理查询: {query}")
        vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
        results = vs.similarity_search_with_score(query, k=10)
        
        conn = await get_db_connection()
        logger.info("数据库连接成功")
        
        notes_data = []
        seen_note_ids = set()
        for doc, score in results:
            content_hash = get_content_hash(doc.page_content)
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute("SELECT note_id FROM vector_store WHERE content_hash = %s", (content_hash,))
                result = await cursor.fetchone()
                
                if not result or result['note_id'] in seen_note_ids:
                    continue
                
                note_id = result['note_id']
                seen_note_ids.add(note_id)
                note = await get_note_details(conn, note_id)
                clean_content = await get_clean_note_content(conn, note_id)
                
                note_data = {
                    'id': note_id,
                    'title': note.get('title', '无标题'),
                    'description': note.get('description', '暂无描述'),
                    'clean_content': clean_content
                }
                notes_data.append(note_data)
        
        return notes_data
    finally:
        if conn is not None:
            await conn.close()
  • 处理流程:
sequenceDiagram
participant User
participant Assistant
participant VectorDB
participant GPT
participant MySQL
User->>Assistant: 输入查询
Assistant->>VectorDB: 向量检索
VectorDB->>MySQL: 获取笔记
MySQL-->>Assistant: 返回笔记数据
Assistant->>GPT: 生成回答
GPT-->>User: 展示结果

ai_assistant.py

  • 功能: AI助手的阿里云实现版本
  • 核心特性:
    • 支持阿里云 DashScope API
    • 笔记内容分析和展示
    • Markdown 格式化输出
  • 关键代码:
async def process_query(query):
    """处理用户查询"""
    conn = None
    try:
        logger.info(f"开始处理查询: {query}")
        
        # 加载向量存储
        vs_path = "./raw_vs"
        logger.info(f"正在检查向量存储路径: {vs_path}")
        if not os.path.exists(vs_path):
            logger.error(f"向量存储路径不存在: {vs_path}")
            return "错误:向量存储不存在", []
            
        logger.info("正在加载向量存储...")
        vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
        
        # 搜索相关向量
        logger.info("开始向量搜索...")
        results = vs.similarity_search_with_score(query, k=10)
        logger.info(f"找到 {len(results)} 条相关结果")
        
        try:
            conn = await get_db_connection()
            if not conn:
                logger.error("数据库连接失败")
                return "数据库连接失败", []
            
            logger.info("数据库连接成功")
            context_notes = []
            notes_data = []  # 存储完整的笔记数据
            
            # 获取相似度分数大于0.5的前5条笔记
            seen_note_ids = set()
            for doc, similarity_score in results:
                # MAX_INNER_PRODUCT 策略下,分数越大表示越相似
                # 如果相似度分数小于0.5,跳过该结果
                if similarity_score < 0.5:
                    continue
                    
                try:
                    content_hash = get_content_hash(doc.page_content)
                    logger.info(f"处理content_hash: {content_hash}, 相似度分数: {similarity_score}")
                    
                    async with conn.cursor(aiomysql.DictCursor) as cursor:
                        await cursor.execute("""
                            SELECT note_id FROM vector_store 
                            WHERE content_hash = %s
                        """, (content_hash,))
                        result = await cursor.fetchone()
                        
                        if not result or result['note_id'] in seen_note_ids:
                            continue
                            
                        note_id = result['note_id']
                        seen_note_ids.add(note_id)
                        
                        # 获取笔记详情和清洗内容
                        note = await get_note_details(conn, note_id)
                        clean_content = await get_clean_note_content(conn, note_id)
                        
                        if not note or not clean_content:
                            continue
                        
                        # 构建完整的笔记数据
                        note_data = {
                            'id': note_id,
                            'title': note.get('title', '无标题'),
                            'description': note.get('description', '暂无描述'),
                            'collected_count': note.get('collected_count', 0),
                            'comment_count': note.get('comment_count', 0),
                            'share_count': note.get('share_count', 0),
                            'clean_content': clean_content
                        }
                        
                        notes_data.append(note_data)
                        context_notes.append(f"标题:{note['title']}\n内容:{note['description']}")
                        
                        if len(notes_data) >= 5:  # 仍然限制最多5条
                            break
                            
                except Exception as e:
                    logger.error(f"处理笔记时出错: {str(e)}")
                    continue
            
            # 即使没有找到符合条件的笔记也继续执行
            logger.info(f"找到 {len(notes_data)} 条符合相似度要求的笔记")
            
            if not notes_data:
                logger.warning("未获取到任何有效的笔记内容")
                # return "未找到相关笔记内容", []
            
            # 准备GPT提示
            logger.info("准备调用GPT...")
            system_prompt = """你是一位专业的化妆行业教师专门帮助产品经理理解和分析小红书笔记。你的回答分下面3种情况
            1、如果用户的输入与化妆品无关不要参考相关笔记内容正常与客户交流。
            2、如果用户的输入与化妆品相关而且找到相关笔记内容参考相关笔记内容给出回答。
            3、如果用户的输入与化妆品相关但是没有找到相关笔记内容请结合上下文和历史记录给出回答。
            回答要突出重点,并结合化妆品行业的专业知识。\n"""  + f"相关笔记内容:\n" + "\n\n".join(context_notes)
            user_prompt = f"{query}"
            logger.info("调用GPT获取回答...")
            gpt_response = await chat_with_gpt(system_prompt, user_prompt)
            
            return gpt_response, notes_data
            
        except Exception as e:
            logger.error(f"处理数据时出错: {str(e)}")
            return f"处理数据时出错: {str(e)}", []
            
    finally:
        if conn is not None:
            try:
                await conn.close()
                logger.info("数据库连接已关闭")
            except Exception as e:
                logger.error(f"关闭数据库连接时出错: {str(e)}")

  • 处理流程:
graph TD
A[用户查询] --> B[AI助手]
B --> C[向量检索]
C --> D[数据库]
D --> E[笔记数据]
B --> F[GPT分析]
E --> F
F --> G[结果展示]

数据处理脚本

process_raw_notes.py

  • 功能: 处理原始笔记数据
  • 主要功能:
    • 文本分割
    • 向量化存储
    • 数据库存储
  • 关键代码:
async def process_xhs_notes():
    """处理小红书笔记数据"""
    conn = None
    try:
        logger.info("正在尝试连接数据库...")
        # 数据库连接配置
        conn = await aiomysql.connect(
            host='183.11.229.79',
            port=3316,
            user='root',
            password='zaq12wsx@9Xin',
            db='9Xin',
            autocommit=True
        )
        logger.info("数据库连接成功")
        
        if conn is None:
            raise Exception("数据库连接失败")

        async with conn.cursor(aiomysql.DictCursor) as cursor:
            # 获取所有type为'normal'的笔记
            await cursor.execute("""
                SELECT id, note_id, title, description 
                FROM xhs_notes 
                WHERE type = 'normal'
            """)
            notes = await cursor.fetchall()
            
            # 使用text_splitter处理每条笔记
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=120,
                chunk_overlap=20,
                length_function=len,
                is_separator_regex=False,
            )
            
            vs_path = "./raw_vs"
            if not os.path.exists(vs_path):
                os.makedirs(vs_path)
                
            # 初始化向量存储
            if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
                vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
            else:
                vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
            
            # 使用tqdm显示处理进度
            for note in tqdm(notes, desc="处理小红书笔记"):
                # 合标题和描述
                content = f"{note['title']}\n{note['description']}"
                
                # 分割文本
                texts = text_splitter.split_text(content)
                
                if texts:
                    # 创建新的向量存储
                    new_vs = FAISS.from_texts(texts, embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
                    
                    # 将向量数据保存到数据库
                    async with conn.cursor() as insert_cursor:
                        for vector_id, document in new_vs.docstore._dict.items():
                            content = document.page_content
                            content_hash = get_content_hash(content)
                            
                            # 检查是否已存在相同的content_hash
                            await insert_cursor.execute("""
                                SELECT id FROM vector_store 
                                WHERE content_hash = %s
                            """, (content_hash,))
                            
                            if not await insert_cursor.fetchone():
                                # 插入新的向量数据
                                await insert_cursor.execute("""
                                    INSERT INTO vector_store 
                                    (note_id, vector_id, content, content_hash) 
                                    VALUES (%s, %s, %s, %s)
                                """, (note['id'], vector_id, content, content_hash))
                    
                    # 合并向量存储
                    vs.merge_from(new_vs)
                    
            # 保存最终的向量存储
            vs.save_local(vs_path)
            
    except aiomysql.Error as e:
        logger.error(f"MySQL错误: {e}")
        raise
    except Exception as e:
        logger.error(f"处理小红书笔记时出错: {e}")
        raise
    finally:
        # 安全关闭连接
        try:
            if conn is not None:
                logger.info("正在关闭数据库连接...")
                await conn.close()
                logger.info("数据库连接已关闭")
        except Exception as e:
            logger.error(f"关闭数据库连接时出错: {e}")
  • 处理流程:
flowchart TD
A[读取原始笔记] --> B[文本分割]
B --> C[向量化]
C --> D[存储向量]
D --> E[更新数据库]

process_clean_notes.py

  • 功能: 清洗和处理笔记数据
  • 主要功能:
    • GPT内容分析
    • 思维导图生成
    • 内容结构化存储
  • 关键代码:
async def process_clean_notes():
    """处理笔记并生成清洗后的向量存储"""
    conn = None
    try:
        logger.info("正在尝试连接数据库...")
        # 数据库连接配置
        conn = await aiomysql.connect(
            host='183.11.229.79',
            port=3316,
            user='root',
            password='zaq12wsx@9Xin',
            db='9Xin',
            autocommit=True
        )
        logger.info("数据库连接成功")
        
        if conn is None:
            raise Exception("数据库连接失败")

        async with conn.cursor(aiomysql.DictCursor) as cursor:
            # 获取所有normal类型的笔记
            await cursor.execute("""
                SELECT id, note_id, title, description 
                FROM xhs_notes 
                WHERE type = 'normal'
            """)
            notes = await cursor.fetchall()
            
            # 初始化向量存储
            vs_path = "./clean_faiss"
            if not os.path.exists(vs_path):
                os.makedirs(vs_path)
                
            if os.path.isfile(os.path.join(vs_path, 'index.faiss')) and os.path.isfile(os.path.join(vs_path, 'index.pkl')):
                vs = FAISS.load_local(vs_path, embed_model, allow_dangerous_deserialization=True)
            else:
                vs = FAISS.from_texts(["初始化向量存储"], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
            
            # 处理每条笔记
            for note in tqdm(notes, desc="处理笔记"):
                content = f"{note['title']}\n{note['description']}"
                
                # 获取GPT分析结果
                gpt_response = await interact_with_chatgpt(content)
                if not gpt_response:
                    continue
                
                # 处理每种类型的内容
                for content_type, content in gpt_response.items():
                    # 创建新的向量存储
                    new_vs = FAISS.from_texts([content], embed_model, distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
                    
                    # 保存到数据库
                    async with conn.cursor() as insert_cursor:
                        for vector_id, document in new_vs.docstore._dict.items():
                            content = document.page_content
                            content_hash = get_content_hash(content)
                            
                            # 检查是否已存在
                            await insert_cursor.execute("""
                                SELECT id FROM clean_note_store 
                                WHERE content_hash = %s AND content_type = %s
                            """, (content_hash, content_type))
                            
                            if not await insert_cursor.fetchone():
                                await insert_cursor.execute("""
                                    INSERT INTO clean_note_store 
                                    (note_id, vector_id, content_type, content, content_hash) 
                                    VALUES (%s, %s, %s, %s, %s)
                                """, (note['id'], vector_id, content_type, content, content_hash))
                    
                    # 合并向量存储
                    vs.merge_from(new_vs)
                    
            # 保存最终的向量存储
            vs.save_local(vs_path)
            
    except aiomysql.Error as e:
        logger.error(f"MySQL错误: {e}")
        raise
    except Exception as e:
        logger.error(f"处理笔记时出错: {e}")
        raise
    finally:
        # 安全关闭连接
        try:
            if conn is not None:
                logger.info("正在关闭数据库连接...")
                await conn.close()
                logger.info("数据库连接已关闭")
        except Exception as e:
            logger.error(f"关闭数据库连接时出错: {e}")
  • 处理流程:
flowchart TD
A[读取笔记] --> B[GPT分析]
B --> C[生成导读]
B --> D[生成思维导图]
B --> E[生成摘要]
C & D & E --> F[存储结果]

数据库相关脚本

数据库结构

erDiagram
xhs_notes ||--o{ clean_note_store : contains
xhs_notes ||--o{ vector_store : has
xhs_notes {
int id PK
string note_id
string title
string description
int user_id
string nickname
}
clean_note_store {
int id PK
int note_id FK
string content_type
text content
string vector_id
}
vector_store {
int id PK
int note_id FK
string vector_id
text content
string content_hash
}

src/mcp/server.py

  • 功能: MCP服务端
  • 主要功能:
    • 消息队列管理
    • 任务调度分发
    • 资源监控
    • 服务注册发现

工具脚本

check_downloads.py

  • 功能: 检查媒体文件下载状态
  • 主要功能:
    • 文件完整性检查
    • 断点续传
    • 数据库状态更新

import_xhs_notes.py

  • 功能: 导入小红书笔记数据
  • 主要功能:
    • JSON 数据解析
    • 数据库导入
    • 错误处理

mermaid.py

  • 功能: Mermaid 图表生成工具
  • 主要功能:
    • 图表渲染
    • Gradio UI 集成
    • HTML 嵌入支持

测试脚本

test_qw.py

  • 功能: 通义千问 API 测试
  • 主要功能:
    • API 连接测试
    • 模型响应测试

src/test_db_client.py

  • 功能: MCP客户端测试
  • 主要功能:
    • 连接测试
    • 查询测试
    • 资源列表测试

配置文件

config.py

  • 功能: 全局配置文件
  • 主要配置:
    • API密钥
    • 模型路径
    • 服务器URL

使用说明

  1. 首先确保所有依赖已安装:
pip install -r requirements.txt
  1. 配置数据库连接信息和API密钥

  2. 运行数据处理脚本进行初始化:

python process_raw_notes.py
python process_clean_notes.py
  1. 启动AI助手应用:
python assistant.py

注意事项

  • 确保数据库服务正常运行
  • 检查API密钥配置
  • 确保模型文件路径正确
  • 注意数据安全和隐私保护