Advertisement
gubichas

Untitled

Mar 12th, 2025
93
0
18 hours
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.87 KB | None | 0 0
  1. import argparse
  2. from pathlib import Path
  3. import pandas as pd
  4. import asyncio
  5. from datetime import datetime
  6. import logging
  7.  
  8. # Импорт утилит для работы с БД и загрузки SQL-файлов (предполагается, что они уже реализованы)
  9. from utility.db_tools.async_from_ch_to_postgres import async_load_from_ch_to_postgres, async_click_con
  10. from utility.db_tools.from_ch_to_postgres import load_from_ch_to_postgres, psql_conn
  11. from utility.db_tools.postgres_tools.load import db_load, insert_copy, pg_engine
  12. from utility.db_tools.load_sql_file import load_sql_file
  13. from utility.api_connector_tools import valid_date
  14. from video_utils import get_from_ch, get_from_ch_using_pd, get_from_pg_using_pd
  15.  
  16. # Импорт SQL-запросов для ClickHouse из отдельного файла
  17. from ch_queries import get_new_shows_query, get_new_clicks_query
  18.  
  19. logger = logging.getLogger('new_pipeline')
  20.  
  21. # Названия и схемы целевых таблиц в Postgres
  22. NEW_DSP_SHOWS_TABLE = 'new_video_dsp_shows'
  23. NEW_DSP_CLICKS_TABLE = 'new_video_dsp_clicks'
  24. NEW_DSP_SHOWS_SCHEMA = 'dwh'
  25. NEW_DSP_CLICKS_SCHEMA = 'dwh'
  26.  
  27. from __init__ import PROJECT_PATH
  28.  
  29. def load_new_shows_data(date_day: str) -> pd.DataFrame:
  30.     """
  31.    Загружает и агрегирует данные по показам (shows) из ClickHouse,
  32.    используя запрос из файла ch_queries.py.
  33.    """
  34.     query = get_new_shows_query(date_day)
  35.     logger.info("Выполняется запрос для загрузки новых данных по показам (shows)")
  36.     shows_df = get_from_ch_using_pd(query)
  37.     return shows_df
  38.  
  39. def load_new_clicks_data(date_day: str) -> pd.DataFrame:
  40.     """
  41.    Загружает и агрегирует данные по кликам (clicks) из ClickHouse,
  42.    используя запрос из файла ch_queries.py.
  43.    """
  44.     query = get_new_clicks_query(date_day)
  45.     logger.info("Выполняется запрос для загрузки новых данных по кликам (clicks)")
  46.     clicks_df = get_from_ch_using_pd(query)
  47.     return clicks_df
  48.  
  49. async def main(date_str: str):
  50.     # Очищаем целевые таблицы в Postgres
  51.     logger.info(f"Очищаем таблицы {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE} и {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
  52.     pg_engine.execute(f"TRUNCATE {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE}")
  53.     pg_engine.execute(f"TRUNCATE {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
  54.  
  55.     # Загрузка новых данных по показам и кликам за заданный день
  56.     shows_df = load_new_shows_data(date_str)
  57.     logger.info(f"Загружено {shows_df.shape[0]} строк по показам (shows) за {date_str}")
  58.  
  59.     clicks_df = load_new_clicks_data(date_str)
  60.     logger.info(f"Загружено {clicks_df.shape[0]} строк по кликам (clicks) за {date_str}")
  61.  
  62.     # Загрузка агрегированных данных в целевые таблицы Postgres
  63.     db_load(pg_engine, shows_df, day=date_str, schema=NEW_DSP_SHOWS_SCHEMA, table=NEW_DSP_SHOWS_TABLE, method=insert_copy)
  64.     logger.info("Данные по показам успешно загружены в Postgres")
  65.  
  66.     db_load(pg_engine, clicks_df, day=date_str, schema=NEW_DSP_CLICKS_SCHEMA, table=NEW_DSP_CLICKS_TABLE, method=insert_copy)
  67.     logger.info("Данные по кликам успешно загружены в Postgres")
  68.  
  69. if __name__ == '__main__':
  70.     parser = argparse.ArgumentParser()
  71.     parser.add_argument('--date', type=lambda s: datetime.strptime(s, '%Y-%m-%d').date(), required=True,
  72.                         help="Дата в формате YYYY-MM-DD")
  73.     args = parser.parse_args()
  74.     asyncio.run(main(args.date.strftime('%Y-%m-%d')))
  75.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement