Advertisement
Mochinov

Untitled

Mar 25th, 2024
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.59 KB | None | 0 0
  1. import redis
  2. import polars as pl
  3.  
  4. from clickhouse_driver.dbapi.extras import DictCursor
  5. from forecasts.clickhouse.db import get_native_clickhouse_connection
  6.  
  7.  
  8. class MasterDataCache:
  9.  
  10. def __init__(self, host, port):
  11. """
  12. Initialize the MasterDataCache instance.
  13.  
  14. Args:
  15. host (str): The Redis server host.
  16. port (int): The Redis server port.
  17. """
  18. self.host = host
  19. self.port = port
  20. self.redis_client = None
  21.  
  22. def __enter__(self):
  23. """
  24. Context manager entry point for safely opening Redis connection.
  25. """
  26. try:
  27. self.redis_client = redis.Redis(host=self.host, port=self.port)
  28. except redis.exceptions.RedisError as e:
  29. print(f"Error connecting to Redis: {e}")
  30. raise
  31.  
  32. return self
  33.  
  34. def __exit__(self, exc_type, exc_val, exc_tb):
  35. """
  36. Context manager exit point for safely closing Redis connection.
  37. """
  38. if self.redis_client:
  39. self.redis_client.close()
  40.  
  41.  
  42. def get_master_data_from_source(self, tetris_scenario_id, source):
  43. """
  44. Retrieve master data for a given scenario ID from the cache.
  45.  
  46. Args:
  47. tetris_scenario_id (int): The ID of the scenario.
  48.  
  49. Returns:
  50. DataFrame: The master data for the given scenario ID.
  51. """
  52. if not self.redis_client:
  53. raise ValueError("Redis client is not initialized.")
  54.  
  55. redis_key = f"description_scenario:{tetris_scenario_id}:{source}"
  56. cached_data = self.redis_client.get(redis_key)
  57.  
  58. if cached_data:
  59. return pl.from_arrow(pl.deserialize(cached_data))
  60. else:
  61. master_data = self.load_master_data_from_source(tetris_scenario_id)
  62. self.redis_client.set(redis_key, pl.serialize(master_data.to_arrow()).to_buffer().to_pybytes())
  63. return master_data
  64.  
  65. def load_master_data_from_source(self, tetris_scenario_id, clickhouse_model):
  66. """
  67. Load master data for a given scenario ID from the data source.
  68.  
  69. Args:
  70. tetris_scenario_id (int): The ID of the scenario.
  71.  
  72. Returns:
  73. DataFrame: The master data for the given scenario ID.
  74. """
  75.  
  76. correspondant_data = []
  77. for validator in clickhouse_model.get_master_data_correspondant():
  78. for model in validator.correspondant_models:
  79. field = validator.field
  80. correspondant_field_name = validator.correspondant_field_name
  81. description_field_name = validator.description_name
  82. correspondant_field_desc = validator.correspondant_field_desc
  83.  
  84. correspondant_data.append(
  85. {
  86. "field": field,
  87. "correspondant_field_name": correspondant_field_name,
  88. "correspondant_models": {
  89. "key": f"{model.__tablename__}_{correspondant_field_name}",
  90. "description_field_name": f"{model.__tablename__}_{description_field_name}",
  91. "model": model,
  92. "model_field": correspondant_field_desc,
  93. },
  94. }
  95. )
  96.  
  97.  
  98. with get_native_clickhouse_connection() as conn:
  99. data = []
  100. with conn.cursor(cursor_factory=DictCursor) as cursor:
  101. # desc_name = (
  102. # f'{tablename}_'
  103. # f'{data["field"]}_{data["correspondant_models"]["model_field"]}'
  104. # )
  105. # query_range = ", ".join(
  106. # [
  107. # f"{item}" if not isinstance(item, str) else f"'{item}'"
  108. # for item in data["unique_value"]
  109. # ]
  110. # )
  111. query_desc = (
  112. f'SELECT {data["correspondant_field_name"]} as {data["field"]}, '\
  113. f'{data["correspondant_models"]["model_field"]} as '
  114. f'"{title_fields.get(data["field"])} {TABLES_DESCRIPTION_NAME.get(data["correspondant_models"]["model"].__tablename__, desc_name)}" '
  115. f'FROM {data["correspondant_models"]["model"].__tablename__} '
  116. f'WHERE tetris_tetris_scenario_id == {current_scenario.pk} '
  117. f'and {data["correspondant_field_name"]} in ({query_range})'
  118. )
  119.  
  120.  
  121. # Placeholder method for loading data from source
  122. return pl.DataFrame()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement