Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import redis
- import polars as pl
- from clickhouse_driver.dbapi.extras import DictCursor
- from forecasts.clickhouse.db import get_native_clickhouse_connection
- class MasterDataCache:
- def __init__(self, host, port):
- """
- Initialize the MasterDataCache instance.
- Args:
- host (str): The Redis server host.
- port (int): The Redis server port.
- """
- self.host = host
- self.port = port
- self.redis_client = None
- def __enter__(self):
- """
- Context manager entry point for safely opening Redis connection.
- """
- try:
- self.redis_client = redis.Redis(host=self.host, port=self.port)
- except redis.exceptions.RedisError as e:
- print(f"Error connecting to Redis: {e}")
- raise
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Context manager exit point for safely closing Redis connection.
- """
- if self.redis_client:
- self.redis_client.close()
- def get_master_data_from_source(self, tetris_scenario_id, source):
- """
- Retrieve master data for a given scenario ID from the cache.
- Args:
- tetris_scenario_id (int): The ID of the scenario.
- Returns:
- DataFrame: The master data for the given scenario ID.
- """
- if not self.redis_client:
- raise ValueError("Redis client is not initialized.")
- redis_key = f"description_scenario:{tetris_scenario_id}:{source}"
- cached_data = self.redis_client.get(redis_key)
- if cached_data:
- return pl.from_arrow(pl.deserialize(cached_data))
- else:
- master_data = self.load_master_data_from_source(tetris_scenario_id)
- self.redis_client.set(redis_key, pl.serialize(master_data.to_arrow()).to_buffer().to_pybytes())
- return master_data
- def load_master_data_from_source(self, tetris_scenario_id, clickhouse_model):
- """
- Load master data for a given scenario ID from the data source.
- Args:
- tetris_scenario_id (int): The ID of the scenario.
- Returns:
- DataFrame: The master data for the given scenario ID.
- """
- correspondant_data = []
- for validator in clickhouse_model.get_master_data_correspondant():
- for model in validator.correspondant_models:
- field = validator.field
- correspondant_field_name = validator.correspondant_field_name
- description_field_name = validator.description_name
- correspondant_field_desc = validator.correspondant_field_desc
- correspondant_data.append(
- {
- "field": field,
- "correspondant_field_name": correspondant_field_name,
- "correspondant_models": {
- "key": f"{model.__tablename__}_{correspondant_field_name}",
- "description_field_name": f"{model.__tablename__}_{description_field_name}",
- "model": model,
- "model_field": correspondant_field_desc,
- },
- }
- )
- with get_native_clickhouse_connection() as conn:
- data = []
- with conn.cursor(cursor_factory=DictCursor) as cursor:
- # desc_name = (
- # f'{tablename}_'
- # f'{data["field"]}_{data["correspondant_models"]["model_field"]}'
- # )
- # query_range = ", ".join(
- # [
- # f"{item}" if not isinstance(item, str) else f"'{item}'"
- # for item in data["unique_value"]
- # ]
- # )
- query_desc = (
- f'SELECT {data["correspondant_field_name"]} as {data["field"]}, '\
- f'{data["correspondant_models"]["model_field"]} as '
- f'"{title_fields.get(data["field"])} {TABLES_DESCRIPTION_NAME.get(data["correspondant_models"]["model"].__tablename__, desc_name)}" '
- f'FROM {data["correspondant_models"]["model"].__tablename__} '
- f'WHERE tetris_tetris_scenario_id == {current_scenario.pk} '
- f'and {data["correspondant_field_name"]} in ({query_range})'
- )
- # Placeholder method for loading data from source
- return pl.DataFrame()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement