Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from threading import Lock, RLock
- from typing import Any, Dict, Optional, TypeVar
- import threading
- from collections import defaultdict
- import time
- from concurrent.futures import ThreadPoolExecutor
- import queue
- K = TypeVar('K')
- V = TypeVar('V')
- class StripedLock:
- """
- Implements lock striping for more granular concurrency control.
- Inspired by ConcurrentHashMap's segment locking in Java.
- """
- def __init__(self, num_stripes: int = 32):
- self.num_stripes = num_stripes
- self.locks = [RLock() for _ in range(num_stripes)]
- def get_lock(self, key: Any) -> Lock:
- # Use hash of key to determine stripe
- stripe = hash(key) % self.num_stripes
- return self.locks[stripe]
- class ConcurrentKVStore:
- """
- High-performance concurrent key-value store optimized for both reads and writes.
- Features:
- - Lock striping for fine-grained concurrency control
- - Read-write separation using double buffering for bulk operations
- - Configurable read/write optimization
- - Lock-free reads for read-heavy workloads
- """
- def __init__(self, read_optimized: bool = True):
- self.read_optimized = read_optimized
- self.data: Dict[K, V] = {}
- self.striped_locks = StripedLock()
- self.stats = defaultdict(int)
- self.stats_lock = Lock()
- # For read-optimization using double-buffering
- self.read_buffer: Dict[K, V] = {}
- self.buffer_lock = Lock()
- self.last_sync = time.time()
- self.sync_interval = 0.1 # 100ms
- # Request queues for async processing
- self.write_queue = queue.Queue()
- self.read_queue = queue.Queue()
- # Start background workers
- self.start_background_workers()
- def start_background_workers(self):
- """Initialize background workers for async operations"""
- self.executor = ThreadPoolExecutor(max_workers=4)
- self.executor.submit(self._write_worker)
- self.executor.submit(self._read_worker)
- if self.read_optimized:
- self.executor.submit(self._buffer_sync_worker)
- def _write_worker(self):
- """Background worker for processing write requests"""
- while True:
- try:
- key, value, result_future = self.write_queue.get()
- try:
- self.put_sync(key, value)
- result_future.set_result(True)
- except Exception as e:
- result_future.set_exception(e)
- self.write_queue.task_done()
- except Exception:
- continue
- def _read_worker(self):
- """Background worker for processing read requests"""
- while True:
- try:
- key, result_future = self.read_queue.get()
- try:
- value = self.get_sync(key)
- result_future.set_result(value)
- except Exception as e:
- result_future.set_exception(e)
- self.read_queue.task_done()
- except Exception:
- continue
- def _buffer_sync_worker(self):
- """Background worker for syncing read buffer"""
- while True:
- time.sleep(self.sync_interval)
- current_time = time.time()
- if current_time - self.last_sync >= self.sync_interval:
- with self.buffer_lock:
- self.read_buffer = self.data.copy()
- self.last_sync = current_time
- def put_sync(self, key: K, value: V) -> None:
- """Synchronous put operation with fine-grained locking"""
- with self.striped_locks.get_lock(key):
- self.data[key] = value
- with self.stats_lock:
- self.stats['writes'] += 1
- def get_sync(self, key: K) -> Optional[V]:
- """Synchronous get operation with optimized reading strategy"""
- with self.stats_lock:
- self.stats['reads'] += 1
- if self.read_optimized:
- # Try reading from buffer first (lock-free)
- value = self.read_buffer.get(key)
- if value is not None:
- return value
- # Fall back to main storage with fine-grained locking
- with self.striped_locks.get_lock(key):
- return self.data.get(key)
- async def put(self, key: K, value: V) -> None:
- """Asynchronous put operation"""
- future = self.executor.submit(self.put_sync, key, value)
- return await future
- async def get(self, key: K) -> Optional[V]:
- """Asynchronous get operation"""
- future = self.executor.submit(self.get_sync, key)
- return await future
- def get_stats(self) -> Dict[str, int]:
- """Get current statistics"""
- with self.stats_lock:
- return dict(self.stats)
- # Example usage and testing
- def run_concurrent_test():
- """Test concurrent operations on the KV store"""
- store = ConcurrentKVStore(read_optimized=True)
- num_threads = 4
- operations_per_thread = 10000
- def writer_task():
- for i in range(operations_per_thread):
- key = f"key_{threading.get_ident()}_{i}"
- store.put_sync(key, i)
- def reader_task():
- for i in range(operations_per_thread):
- key = f"key_{threading.get_ident()}_{i}"
- store.get_sync(key)
- threads = []
- start_time = time.time()
- # Create reader and writer threads
- for _ in range(num_threads):
- threads.append(threading.Thread(target=writer_task))
- threads.append(threading.Thread(target=reader_task))
- # Start all threads
- for thread in threads:
- thread.start()
- # Wait for all threads to complete
- for thread in threads:
- thread.join()
- end_time = time.time()
- duration = end_time - start_time
- stats = store.get_stats()
- print(f"Test completed in {duration:.2f} seconds")
- print(f"Total operations: {stats['reads'] + stats['writes']}")
- print(f"Operations per second: {(stats['reads'] + stats['writes']) / duration:.2f}")
- if __name__ == "__main__":
- run_concurrent_test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement