250 lines
9.4 KiB
Python
250 lines
9.4 KiB
Python
|
from bs4 import BeautifulSoup
|
||
|
import os
|
||
|
import gc
|
||
|
import requests
|
||
|
from langdetect import detect, LangDetectException
|
||
|
from typing import AsyncIterator, Iterator
|
||
|
|
||
|
from langchain_core.document_loaders import BaseLoader
|
||
|
from langchain_core.documents import Document
|
||
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||
|
import requests
|
||
|
|
||
|
def fetch_documents(ip_address, dataset_id, api_key):
|
||
|
url = f'http://{ip_address}/v1/datasets/{dataset_id}/documents'
|
||
|
headers = {
|
||
|
'Authorization': f'Bearer {api_key}'
|
||
|
}
|
||
|
|
||
|
try:
|
||
|
response = requests.get(url, headers=headers)
|
||
|
response.raise_for_status() # Raise HTTPError for bad responses (4xx and 5xx)
|
||
|
|
||
|
return response.json() # Assuming the response is in JSON format
|
||
|
|
||
|
except requests.exceptions.HTTPError as http_err:
|
||
|
print(f'HTTP error occurred: {http_err}')
|
||
|
except Exception as err:
|
||
|
print(f'Other error occurred: {err}')
|
||
|
return None
|
||
|
# 文件扩展名集合
|
||
|
file_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'}
|
||
|
|
||
|
def is_file(url):
|
||
|
return any(url.lower().endswith(ext) for ext in file_extensions)
|
||
|
|
||
|
def is_webpage(url):
|
||
|
try:
|
||
|
response = requests.get(url, timeout=5)
|
||
|
content_type = response.headers.get('Content-Type', '')
|
||
|
return 'text/html' in content_type
|
||
|
except requests.RequestException:
|
||
|
return False
|
||
|
|
||
|
def can_crawl_webpage(url):
|
||
|
try:
|
||
|
response = requests.get(url, timeout=5)
|
||
|
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
|
||
|
# 检查是否有大量JavaScript代码
|
||
|
scripts = soup.find_all('script')
|
||
|
if len(scripts) > 10:
|
||
|
return False
|
||
|
|
||
|
# 检查页面内容是否主要是乱码或图片
|
||
|
text_content = ' '.join([t.get_text() for t in soup.find_all('p')])
|
||
|
try:
|
||
|
detected_language = detect(text_content)
|
||
|
if detected_language != 'zh-cn':
|
||
|
return False
|
||
|
except LangDetectException:
|
||
|
return False
|
||
|
|
||
|
images = soup.find_all('img')
|
||
|
if len(images) > len(soup.find_all('p')):
|
||
|
return False
|
||
|
|
||
|
# Example criteria: Check if there is a meta robots tag with noindex or nofollow
|
||
|
meta_robots = soup.find('meta', attrs={'name': 'robots'})
|
||
|
if meta_robots and ('noindex' in meta_robots['content'] or 'nofollow' in meta_robots['content']):
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
except requests.RequestException:
|
||
|
return False
|
||
|
|
||
|
def is_cosmetic_related(snippet):
|
||
|
keywords = ['化妆品', '美妆', '彩妆', '护肤']
|
||
|
return any(keyword in snippet for keyword in keywords)
|
||
|
|
||
|
def parse_search(result):
|
||
|
ret_str = ""
|
||
|
link = result.get('link')
|
||
|
#snippet = result.get('snippet')
|
||
|
if is_file(link):
|
||
|
ret_str = (f"File: {link}")
|
||
|
elif is_webpage(link):
|
||
|
if can_crawl_webpage(link):# and is_cosmetic_related(snippet):
|
||
|
ret_str = (f"Webpage crawlable: {link}")
|
||
|
else:
|
||
|
ret_str = (f"Webpage not crawlable: {link}")
|
||
|
else:
|
||
|
ret_str = (f"Unknown type: {link}")
|
||
|
return ret_str
|
||
|
|
||
|
async def save_webpage_content(url, engine, search_id, output_dir, pool):
|
||
|
try:
|
||
|
# 检查网页是否已经保存
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
sql = "SELECT COUNT(*) FROM saved_webpages WHERE url = %s"
|
||
|
await cursor.execute(sql, (url,))
|
||
|
count = await cursor.fetchone()
|
||
|
if count[0] > 0:
|
||
|
print(f"Webpage already saved: {url}")
|
||
|
return
|
||
|
|
||
|
response = requests.get(url, timeout=10)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
# 使用 BeautifulSoup 解析网页内容
|
||
|
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
|
||
|
# 提取有用的文字内容
|
||
|
text_content = ' '.join([t.get_text() for t in soup.find_all('p')])
|
||
|
|
||
|
# 保存网页内容
|
||
|
filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}.html")
|
||
|
with open(filename, 'w', encoding='utf-8') as file:
|
||
|
file.write(response.text)
|
||
|
|
||
|
# 保存提取的文字内容
|
||
|
text_filename = os.path.join(output_dir, f"{url.split('//')[-1].replace('/', '_')}_text.txt")
|
||
|
with open(text_filename, 'w', encoding='utf-8') as file:
|
||
|
file.write(text_content)
|
||
|
|
||
|
# 将保存的网页记录到数据库
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
sql = "INSERT INTO saved_webpages (url, engine, search_id, content) VALUES (%s, %s, %s, %s)"
|
||
|
await cursor.execute(sql, (url, engine, search_id,text_content,))
|
||
|
await conn.commit()
|
||
|
|
||
|
print(f"Saved webpage: {filename}")
|
||
|
except Exception as e:
|
||
|
print(f"Failed to save webpage {url}: {e}")
|
||
|
|
||
|
async def download_file(url, engine, search_id, output_dir, pool):
|
||
|
try:
|
||
|
# 检查文件是否已经下载
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
sql = "SELECT COUNT(*) FROM downloaded_files WHERE url = %s"
|
||
|
await cursor.execute(sql, (url,))
|
||
|
count = await cursor.fetchone()
|
||
|
if count[0] > 0:
|
||
|
print(f"File already downloaded: {url}")
|
||
|
return
|
||
|
|
||
|
response = requests.get(url, timeout=10, stream=True)
|
||
|
response.raise_for_status()
|
||
|
local_filename = url.split('/')[-1]
|
||
|
local_filepath = os.path.join(output_dir, local_filename)
|
||
|
with open(local_filepath, 'wb') as file:
|
||
|
for chunk in response.iter_content(chunk_size=8192):
|
||
|
file.write(chunk)
|
||
|
|
||
|
# 将下载的文件记录到数据库
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
sql = "INSERT INTO downloaded_files (url, engine, search_id ) VALUES (%s,%s,%s)"
|
||
|
await cursor.execute(sql, (url,engine, search_id,))
|
||
|
await conn.commit()
|
||
|
|
||
|
print(f"Downloaded file: {local_filepath}")
|
||
|
except Exception as e:
|
||
|
print(f"Failed to download file {url}: {e}")
|
||
|
|
||
|
async def save_non_crawlable(url,engine, search_id, pool):
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
# 检查黑名单是否已经存在该URL
|
||
|
sql = "SELECT COUNT(*) FROM non_crawlable_links WHERE url = %s"
|
||
|
await cursor.execute(sql, (url,))
|
||
|
count = await cursor.fetchone()
|
||
|
if count[0] == 0:
|
||
|
sql = "INSERT INTO non_crawlable_links (url,engine, search_id) VALUES (%s,%s,%s)"
|
||
|
await cursor.execute(sql, (url,engine, search_id,))
|
||
|
await conn.commit()
|
||
|
print(f"Saved non-crawlable link to database: {url}")
|
||
|
else:
|
||
|
print(f"Non-crawlable link already exists in database: {url}")
|
||
|
|
||
|
async def get_blacklist(pool):
|
||
|
blacklist = set()
|
||
|
async with pool.acquire() as conn:
|
||
|
async with conn.cursor() as cursor:
|
||
|
sql = "SELECT url FROM non_crawlable_links"
|
||
|
await cursor.execute(sql)
|
||
|
results = await cursor.fetchall()
|
||
|
blacklist = {result[0] for result in results}
|
||
|
return blacklist
|
||
|
|
||
|
def libreoffice_to_pdf(file_path, output_dir):
|
||
|
if not os.path.exists(file_path):
|
||
|
print(f"Error: The file '{file_path}' does not exist.")
|
||
|
return False
|
||
|
|
||
|
# 创建输出目录
|
||
|
if not os.path.exists(output_dir):
|
||
|
os.makedirs(output_dir)
|
||
|
|
||
|
# 使用LibreOffice将文件转换为PDF
|
||
|
pdf_path = os.path.join(output_dir, os.path.splitext(os.path.basename(file_path))[0] + '.pdf')
|
||
|
command = f'soffice --headless --convert-to pdf --outdir "{output_dir}" "{file_path}"'
|
||
|
os.system(command)
|
||
|
|
||
|
if not os.path.exists(pdf_path):
|
||
|
print(f"Error: Failed to convert '{file_path}' to PDF.")
|
||
|
return False
|
||
|
|
||
|
return pdf_path
|
||
|
|
||
|
def pdf_to_images(pdf_path, output_dir):
|
||
|
# 将PDF文件逐页转换为图像
|
||
|
page_num = 1
|
||
|
for page in convert_from_path(pdf_path, fmt='png', single_file=False):
|
||
|
output_slide_path = os.path.join(output_dir, f'slide_{page_num}.png')
|
||
|
page.save(output_slide_path, 'PNG')
|
||
|
print(f'Saved slide {page_num} as image at {output_slide_path}')
|
||
|
page_num += 1
|
||
|
|
||
|
# 强制清理每页处理后的内存
|
||
|
del page
|
||
|
gc.collect()
|
||
|
|
||
|
def process_files(base_dir):
|
||
|
for root, dirs, files in os.walk(base_dir):
|
||
|
for file in files:
|
||
|
if file.endswith((".ppt", ".pptx", ".xls", ".xlsx", ".doc", ".docx")):
|
||
|
file_path = os.path.join(root, file)
|
||
|
output_dir = os.path.join(root, os.path.splitext(file)[0])
|
||
|
if not os.path.exists(output_dir):
|
||
|
os.makedirs(output_dir)
|
||
|
|
||
|
pdf_path = libreoffice_to_pdf(file_path, output_dir)
|
||
|
if pdf_path:
|
||
|
pdf_to_images(pdf_path, output_dir)
|
||
|
gc.collect()
|
||
|
elif file.endswith(".pdf"):
|
||
|
pdf_path = os.path.join(root, file)
|
||
|
output_dir = os.path.join(root, os.path.splitext(file)[0])
|
||
|
if not os.path.exists(output_dir):
|
||
|
os.makedirs(output_dir)
|
||
|
|
||
|
pdf_to_images(pdf_path, output_dir)
|
||
|
|
||
|
# 清理内存
|
||
|
gc.collect()
|
||
|
|