web_search/util_old.py

250 lines
9.4 KiB
Python
Raw Permalink Normal View History

2024-12-19 03:32:54 +00:00
from bs4 import BeautifulSoup
import os
import gc
import requests
from langdetect import detect, LangDetectException
from typing import AsyncIterator, Iterator
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import requests
def fetch_documents(ip_address, dataset_id, api_key):
url = f'http://{ip_address}/v1/datasets/{dataset_id}/documents'
headers = {
'Authorization': f'Bearer {api_key}'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise HTTPError for bad responses (4xx and 5xx)
return response.json() # Assuming the response is in JSON format
except requests.exceptions.HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Exception as err:
print(f'Other error occurred: {err}')
return None
# 文件扩展名集合
file_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'}
def is_file(url):
return any(url.lower().endswith(ext) for ext in file_extensions)
def is_webpage(url):
try:
response = requests.get(url, timeout=5)
content_type = response.headers.get('Content-Type', '')
return 'text/html' in content_type
except requests.RequestException:
return False
def can_crawl_webpage(url):
try:
response = requests.get(url, timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
# 检查是否有大量JavaScript代码
scripts = soup.find_all('script')
if len(scripts) > 10:
return False
# 检查页面内容是否主要是乱码或图片
text_content = ' '.join([t.get_text() for t in soup.find_all('p')])
try:
detected_language = detect(text_content)
if detected_language != 'zh-cn':
return False
except LangDetectException:
return False
images = soup.find_all('img')
if len(images) > len(soup.find_all('p')):
return False
# Example criteria: Check if there is a meta robots tag with noindex or nofollow
meta_robots = soup.find('meta', attrs={'name': 'robots'})
if meta_robots and ('noindex' in meta_robots['content'] or 'nofollow' in meta_robots['content']):
return False
return True
except requests.RequestException:
return False
def is_cosmetic_related(snippet):
keywords = ['化妆品', '美妆', '彩妆', '护肤']
return any(keyword in snippet for keyword in keywords)
def parse_search(result):
ret_str = ""
link = result.get('link')
#snippet = result.get('snippet')
if is_file(link):
ret_str = (f"File: {link}")
elif is_webpage(link):
if can_crawl_webpage(link):# and is_cosmetic_related(snippet):
ret_str = (f"Webpage crawlable: {link}")
else:
ret_str = (f"Webpage not crawlable: {link}")
else:
ret_str = (f"Unknown type: {link}")
return ret_str
async def save_webpage_content(url, engine, search_id, output_dir, pool):
try:
# 检查网页是否已经保存
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "SELECT COUNT(*) FROM saved_webpages WHERE url = %s"
await cursor.execute(sql, (url,))
count = await cursor.fetchone()
if count[0] > 0:
print(f"Webpage already saved: {url}")
return
response = requests.get(url, timeout=10)
response.raise_for_status()
# 使用 BeautifulSoup 解析网页内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取有用的文字内容
text_content = ' '.join([t.get_text() for t in soup.find_all('p')])
# 保存网页内容
filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}.html")
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
# 保存提取的文字内容
text_filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}_text.txt")
with open(text_filename, 'w', encoding='utf-8') as file:
file.write(text_content)
# 将保存的网页记录到数据库
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "INSERT INTO saved_webpages (url, engine, search_id, content) VALUES (%s, %s, %s, %s)"
await cursor.execute(sql, (url, engine, search_id,text_content,))
await conn.commit()
print(f"Saved webpage: {filename}")
except Exception as e:
print(f"Failed to save webpage {url}: {e}")
async def download_file(url, engine, search_id, output_dir, pool):
try:
# 检查文件是否已经下载
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "SELECT COUNT(*) FROM downloaded_files WHERE url = %s"
await cursor.execute(sql, (url,))
count = await cursor.fetchone()
if count[0] > 0:
print(f"File already downloaded: {url}")
return
response = requests.get(url, timeout=10, stream=True)
response.raise_for_status()
local_filename = url.split('/')[-1]
local_filepath = os.path.join(output_dir, local_filename)
with open(local_filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
# 将下载的文件记录到数据库
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "INSERT INTO downloaded_files (url, engine, search_id ) VALUES (%s,%s,%s)"
await cursor.execute(sql, (url,engine, search_id,))
await conn.commit()
print(f"Downloaded file: {local_filepath}")
except Exception as e:
print(f"Failed to download file {url}: {e}")
async def save_non_crawlable(url,engine, search_id, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
# 检查黑名单是否已经存在该URL
sql = "SELECT COUNT(*) FROM non_crawlable_links WHERE url = %s"
await cursor.execute(sql, (url,))
count = await cursor.fetchone()
if count[0] == 0:
sql = "INSERT INTO non_crawlable_links (url,engine, search_id) VALUES (%s,%s,%s)"
await cursor.execute(sql, (url,engine, search_id,))
await conn.commit()
print(f"Saved non-crawlable link to database: {url}")
else:
print(f"Non-crawlable link already exists in database: {url}")
async def get_blacklist(pool):
blacklist = set()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "SELECT url FROM non_crawlable_links"
await cursor.execute(sql)
results = await cursor.fetchall()
blacklist = {result[0] for result in results}
return blacklist
def libreoffice_to_pdf(file_path, output_dir):
if not os.path.exists(file_path):
print(f"Error: The file '{file_path}' does not exist.")
return False
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 使用LibreOffice将文件转换为PDF
pdf_path = os.path.join(output_dir, os.path.splitext(os.path.basename(file_path))[0] + '.pdf')
command = f'soffice --headless --convert-to pdf --outdir "{output_dir}" "{file_path}"'
os.system(command)
if not os.path.exists(pdf_path):
print(f"Error: Failed to convert '{file_path}' to PDF.")
return False
return pdf_path
def pdf_to_images(pdf_path, output_dir):
# 将PDF文件逐页转换为图像
page_num = 1
for page in convert_from_path(pdf_path, fmt='png', single_file=False):
output_slide_path = os.path.join(output_dir, f'slide_{page_num}.png')
page.save(output_slide_path, 'PNG')
print(f'Saved slide {page_num} as image at {output_slide_path}')
page_num += 1
# 强制清理每页处理后的内存
del page
gc.collect()
def process_files(base_dir):
for root, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith((".ppt", ".pptx", ".xls", ".xlsx", ".doc", ".docx")):
file_path = os.path.join(root, file)
output_dir = os.path.join(root, os.path.splitext(file)[0])
if not os.path.exists(output_dir):
os.makedirs(output_dir)
pdf_path = libreoffice_to_pdf(file_path, output_dir)
if pdf_path:
pdf_to_images(pdf_path, output_dir)
gc.collect()
elif file.endswith(".pdf"):
pdf_path = os.path.join(root, file)
output_dir = os.path.join(root, os.path.splitext(file)[0])
if not os.path.exists(output_dir):
os.makedirs(output_dir)
pdf_to_images(pdf_path, output_dir)
# 清理内存
gc.collect()