xhs_crawler/cache/local_cache.py

132 lines
3.7 KiB
Python
Raw Normal View History

# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Name : 程序员阿江-Relakkes
# @Time : 2024/6/2 11:05
# @Desc : 本地缓存
import asyncio
import time
2024-06-02 11:57:13 +00:00
from typing import Any, Dict, List, Optional, Tuple
2024-06-02 11:57:13 +00:00
from cache.abs_cache import AbstractCache
2024-06-02 11:57:13 +00:00
class ExpiringLocalCache(AbstractCache):
def __init__(self, cron_interval: int = 10):
"""
初始化本地缓存
:param cron_interval: 定时清楚cache的时间间隔
:return:
"""
self._cron_interval = cron_interval
self._cache_container: Dict[str, Tuple[Any, float]] = {}
self._cron_task: Optional[asyncio.Task] = None
# 开启定时清理任务
self._schedule_clear()
def __del__(self):
"""
析构函数清理定时任务
:return:
"""
if self._cron_task is not None:
self._cron_task.cancel()
def get(self, key: str) -> Optional[Any]:
"""
从缓存中获取键的值
:param key:
:return:
"""
value, expire_time = self._cache_container.get(key, (None, 0))
if value is None:
return None
# 如果键已过期则删除键并返回None
if expire_time < time.time():
del self._cache_container[key]
return None
return value
def set(self, key: str, value: Any, expire_time: int) -> None:
"""
将键的值设置到缓存中
:param key:
:param value:
:param expire_time:
:return:
"""
self._cache_container[key] = (value, time.time() + expire_time)
2024-06-02 11:57:13 +00:00
def keys(self, pattern: str) -> List[str]:
"""
获取所有符合pattern的key
:param pattern: 匹配模式
:return:
"""
if pattern == '*':
return list(self._cache_container.keys())
# 本地缓存通配符暂时将*替换为空
if '*' in pattern:
pattern = pattern.replace('*', '')
return [key for key in self._cache_container.keys() if pattern in key]
def _schedule_clear(self):
"""
开启定时清理任务,
:return:
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._cron_task = loop.create_task(self._start_clear_cron())
def _clear(self):
"""
根据过期时间清理缓存
:return:
"""
for key, (value, expire_time) in self._cache_container.items():
if expire_time < time.time():
del self._cache_container[key]
async def _start_clear_cron(self):
"""
开启定时清理任务
:return:
"""
while True:
self._clear()
await asyncio.sleep(self._cron_interval)
if __name__ == '__main__':
cache = ExpiringLocalCache(cron_interval=2)
cache.set('name', '程序员阿江-Relakkes', 3)
print(cache.get('key'))
2024-06-02 11:57:13 +00:00
print(cache.keys("*"))
time.sleep(4)
print(cache.get('key'))
del cache
time.sleep(1)
2024-06-02 11:57:13 +00:00
print("done")