from bs4 import BeautifulSoup import os import gc import requests from langdetect import detect, LangDetectException from typing import AsyncIterator, Iterator from langchain_core.document_loaders import BaseLoader from langchain_core.documents import Document from langchain.text_splitter import RecursiveCharacterTextSplitter import requests def fetch_documents(ip_address, dataset_id, api_key): url = f'http://{ip_address}/v1/datasets/{dataset_id}/documents' headers = { 'Authorization': f'Bearer {api_key}' } try: response = requests.get(url, headers=headers) response.raise_for_status() # Raise HTTPError for bad responses (4xx and 5xx) return response.json() # Assuming the response is in JSON format except requests.exceptions.HTTPError as http_err: print(f'HTTP error occurred: {http_err}') except Exception as err: print(f'Other error occurred: {err}') return None # 文件扩展名集合 file_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'} def is_file(url): return any(url.lower().endswith(ext) for ext in file_extensions) def is_webpage(url): try: response = requests.get(url, timeout=5) content_type = response.headers.get('Content-Type', '') return 'text/html' in content_type except requests.RequestException: return False def can_crawl_webpage(url): try: response = requests.get(url, timeout=5) soup = BeautifulSoup(response.text, 'html.parser') # 检查是否有大量JavaScript代码 scripts = soup.find_all('script') if len(scripts) > 10: return False # 检查页面内容是否主要是乱码或图片 text_content = ' '.join([t.get_text() for t in soup.find_all('p')]) try: detected_language = detect(text_content) if detected_language != 'zh-cn': return False except LangDetectException: return False images = soup.find_all('img') if len(images) > len(soup.find_all('p')): return False # Example criteria: Check if there is a meta robots tag with noindex or nofollow meta_robots = soup.find('meta', attrs={'name': 'robots'}) if meta_robots and ('noindex' in meta_robots['content'] or 'nofollow' in meta_robots['content']): return False return True except requests.RequestException: return False def is_cosmetic_related(snippet): keywords = ['化妆品', '美妆', '彩妆', '护肤'] return any(keyword in snippet for keyword in keywords) def parse_search(result): ret_str = "" link = result.get('link') #snippet = result.get('snippet') if is_file(link): ret_str = (f"File: {link}") elif is_webpage(link): if can_crawl_webpage(link):# and is_cosmetic_related(snippet): ret_str = (f"Webpage crawlable: {link}") else: ret_str = (f"Webpage not crawlable: {link}") else: ret_str = (f"Unknown type: {link}") return ret_str async def save_webpage_content(url, engine, search_id, output_dir, pool): try: # 检查网页是否已经保存 async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = "SELECT COUNT(*) FROM saved_webpages WHERE url = %s" await cursor.execute(sql, (url,)) count = await cursor.fetchone() if count[0] > 0: print(f"Webpage already saved: {url}") return response = requests.get(url, timeout=10) response.raise_for_status() # 使用 BeautifulSoup 解析网页内容 soup = BeautifulSoup(response.text, 'html.parser') # 提取有用的文字内容 text_content = ' '.join([t.get_text() for t in soup.find_all('p')]) # 保存网页内容 filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}.html") with open(filename, 'w', encoding='utf-8') as file: file.write(response.text) # 保存提取的文字内容 text_filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}_text.txt") with open(text_filename, 'w', encoding='utf-8') as file: file.write(text_content) # 将保存的网页记录到数据库 async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = "INSERT INTO saved_webpages (url, engine, search_id, content) VALUES (%s, %s, %s, %s)" await cursor.execute(sql, (url, engine, search_id,text_content,)) await conn.commit() print(f"Saved webpage: {filename}") except Exception as e: print(f"Failed to save webpage {url}: {e}") async def download_file(url, engine, search_id, output_dir, pool): try: # 检查文件是否已经下载 async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = "SELECT COUNT(*) FROM downloaded_files WHERE url = %s" await cursor.execute(sql, (url,)) count = await cursor.fetchone() if count[0] > 0: print(f"File already downloaded: {url}") return response = requests.get(url, timeout=10, stream=True) response.raise_for_status() local_filename = url.split('/')[-1] local_filepath = os.path.join(output_dir, local_filename) with open(local_filepath, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) # 将下载的文件记录到数据库 async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = "INSERT INTO downloaded_files (url, engine, search_id ) VALUES (%s,%s,%s)" await cursor.execute(sql, (url,engine, search_id,)) await conn.commit() print(f"Downloaded file: {local_filepath}") except Exception as e: print(f"Failed to download file {url}: {e}") async def save_non_crawlable(url,engine, search_id, pool): async with pool.acquire() as conn: async with conn.cursor() as cursor: # 检查黑名单是否已经存在该URL sql = "SELECT COUNT(*) FROM non_crawlable_links WHERE url = %s" await cursor.execute(sql, (url,)) count = await cursor.fetchone() if count[0] == 0: sql = "INSERT INTO non_crawlable_links (url,engine, search_id) VALUES (%s,%s,%s)" await cursor.execute(sql, (url,engine, search_id,)) await conn.commit() print(f"Saved non-crawlable link to database: {url}") else: print(f"Non-crawlable link already exists in database: {url}") async def get_blacklist(pool): blacklist = set() async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = "SELECT url FROM non_crawlable_links" await cursor.execute(sql) results = await cursor.fetchall() blacklist = {result[0] for result in results} return blacklist def libreoffice_to_pdf(file_path, output_dir): if not os.path.exists(file_path): print(f"Error: The file '{file_path}' does not exist.") return False # 创建输出目录 if not os.path.exists(output_dir): os.makedirs(output_dir) # 使用LibreOffice将文件转换为PDF pdf_path = os.path.join(output_dir, os.path.splitext(os.path.basename(file_path))[0] + '.pdf') command = f'soffice --headless --convert-to pdf --outdir "{output_dir}" "{file_path}"' os.system(command) if not os.path.exists(pdf_path): print(f"Error: Failed to convert '{file_path}' to PDF.") return False return pdf_path def pdf_to_images(pdf_path, output_dir): # 将PDF文件逐页转换为图像 page_num = 1 for page in convert_from_path(pdf_path, fmt='png', single_file=False): output_slide_path = os.path.join(output_dir, f'slide_{page_num}.png') page.save(output_slide_path, 'PNG') print(f'Saved slide {page_num} as image at {output_slide_path}') page_num += 1 # 强制清理每页处理后的内存 del page gc.collect() def process_files(base_dir): for root, dirs, files in os.walk(base_dir): for file in files: if file.endswith((".ppt", ".pptx", ".xls", ".xlsx", ".doc", ".docx")): file_path = os.path.join(root, file) output_dir = os.path.join(root, os.path.splitext(file)[0]) if not os.path.exists(output_dir): os.makedirs(output_dir) pdf_path = libreoffice_to_pdf(file_path, output_dir) if pdf_path: pdf_to_images(pdf_path, output_dir) gc.collect() elif file.endswith(".pdf"): pdf_path = os.path.join(root, file) output_dir = os.path.join(root, os.path.splitext(file)[0]) if not os.path.exists(output_dir): os.makedirs(output_dir) pdf_to_images(pdf_path, output_dir) # 清理内存 gc.collect()