Advertisement
Pandaaaa906

Untitled

May 19th, 2023
882
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 0.97 KB | None | 0 0
  1.  
  2. class ZSetCacheFilter:
  3.     def __init__(self, cache_key: str, timeout: Union[float, int] = None, cache_name: str = "default"):
  4.         self.cache: Redis = get_redis_connection(cache_name)
  5.         self.cache_key = cache_key
  6.         self.timeout = timeout
  7.  
  8.     @property
  9.     def now(self):
  10.         return time()
  11.  
  12.     def add(self, item):
  13.         self.cache.zadd(self.cache_key, {item: self.now})
  14.  
  15.     def remove(self, item, *items):
  16.         self.cache.zrem(self.cache_key, item, *items)
  17.  
  18.     def clean(self, max_time: Union[float, int] = None):
  19.         if max_time is None:
  20.             max_time = self.now - self.timeout
  21.         self.cache.zremrangebyscore(self.cache_key, 0, max_time)
  22.  
  23.     def __contains__(self, item):
  24.         score = self.cache.zscore(self.cache_key, item)
  25.         if score is None:
  26.             return False
  27.         if score < self.now - self.timeout:
  28.             return False
  29.         return True
  30.  
  31.     def __enter__(self):
  32.         return self
  33.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement