Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import redis
- from clickhouse_driver.dbapi.extras import DictCursor
- from config.settings import REDIS_HOST, REDIS_PORT
- from forecasts.clickhouse.db import get_native_clickhouse_connection
- from tetris_optimizer.api.constants import TABLES_DESCRIPTION_FIELDS
- import json
- class MasterDataCache:
- def __init__(self, db=9):
- """
- Initialize the MasterDataCache instance.
- """
- self.host = REDIS_HOST
- self.port = REDIS_PORT
- self.db = db
- self.redis_client = None
- def __enter__(self):
- """
- Context manager entry point for safely opening Redis connection.
- """
- try:
- self.redis_client = redis.Redis(host=self.host, port=self.port, db=self.db)
- except redis.exceptions.RedisError as e:
- print(f"Error connecting to Redis: {e}")
- raise
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Context manager exit point for safely closing Redis connection.
- """
- if self.redis_client:
- self.redis_client.close()
- def serialize_data(self, data):
- """
- Serialize data into a string format.
- Args:
- data (dict): The data to be serialized.
- Returns:
- str: The serialized data.
- """
- serialized_data = json.dumps(data)
- return serialized_data
- def deserialize_data(self, serialized_data):
- """
- Deserialize data from a string format.
- Args:
- serialized_data (str): The serialized data.
- Returns:
- dict: The deserialized data.
- """
- data = json.loads(serialized_data)
- return data
- def get_master_data_from_source(self, tetris_scenario_id):
- """
- Retrieve master data for a given scenario ID from the cache.
- Args:
- tetris_scenario_id (int): The ID of the scenario.
- Returns:
- dict: The master data for the given scenario ID.
- """
- if not self.redis_client:
- raise ValueError("Redis client is not initialized.")
- redis_key = f"description_scenario:{tetris_scenario_id}"
- cached_data = self.redis_client.get(redis_key)
- if cached_data:
- return self.deserialize_data(cached_data)
- else:
- master_data = self.load_master_data_from_source(tetris_scenario_id)
- serialized_data = self.serialize_data(master_data)
- self.redis_client.set(redis_key, serialized_data)
- return master_data
- def load_master_data_from_source(self, tetris_scenario_id):
- """
- Load master data for a given scenario ID from the data source.
- Args:
- tetris_scenario_id (int): The ID of the scenario.
- Returns:
- dict: The master data for the given scenario ID.
- """
- data = {}
- with get_native_clickhouse_connection() as conn:
- with conn.cursor(cursor_factory=DictCursor) as cursor:
- for table, fields in TABLES_DESCRIPTION_FIELDS.items():
- id_field, desc_field = list(fields.items())[0]
- query = (
- f'SELECT {id_field}, {desc_field} '
- f'FROM {table} '
- f'WHERE tetris_scenario_id == {tetris_scenario_id} '
- )
- cursor.execute(query)
- results = cursor.fetchall()
- table_dict = {}
- if results:
- for row in results:
- id_value = row[id_field]
- desc_value = row[desc_field]
- table_dict[str(id_value)] = desc_value
- else:
- table_dict = None
- data[table] = table_dict
- return data
- def update_master_data(self, tetris_scenario_id):
- """
- Update master data for a given scenario ID.
- Args:
- tetris_scenario_id (int): The ID of the scenario.
- """
- if not self.redis_client:
- raise ValueError("Redis client is not initialized.")
- redis_key = f"description_scenario:{tetris_scenario_id}"
- new_data = {}
- with get_native_clickhouse_connection() as conn:
- with conn.cursor(cursor_factory=DictCursor) as cursor:
- for table, fields in TABLES_DESCRIPTION_FIELDS.items():
- id_field, desc_field = list(fields.items())[0]
- query = (
- f'SELECT {id_field}, {desc_field} '
- f'FROM {table} '
- f'WHERE tetris_scenario_id == {tetris_scenario_id} '
- )
- cursor.execute(query)
- results = cursor.fetchall()
- table_dict = {}
- if results:
- for row in results:
- id_value = row[id_field]
- desc_value = row[desc_field]
- table_dict[str(id_value)] = desc_value
- else:
- table_dict = None
- new_data[table] = table_dict
- serialized_data = self.serialize_data(new_data)
- self.redis_client.set(redis_key, serialized_data)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement