Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- from pathlib import Path
- import pandas as pd
- import asyncio
- from datetime import datetime
- import logging
- # Импорт утилит для работы с БД и загрузки SQL-файлов (предполагается, что они уже реализованы)
- from utility.db_tools.async_from_ch_to_postgres import async_load_from_ch_to_postgres, async_click_con
- from utility.db_tools.from_ch_to_postgres import load_from_ch_to_postgres, psql_conn
- from utility.db_tools.postgres_tools.load import db_load, insert_copy, pg_engine
- from utility.db_tools.load_sql_file import load_sql_file
- from utility.api_connector_tools import valid_date
- from video_utils import get_from_ch, get_from_ch_using_pd, get_from_pg_using_pd
- # Импорт SQL-запросов для ClickHouse из отдельного файла
- from ch_queries import get_new_shows_query, get_new_clicks_query
- logger = logging.getLogger('new_pipeline')
- # Названия и схемы целевых таблиц в Postgres
- NEW_DSP_SHOWS_TABLE = 'new_video_dsp_shows'
- NEW_DSP_CLICKS_TABLE = 'new_video_dsp_clicks'
- NEW_DSP_SHOWS_SCHEMA = 'dwh'
- NEW_DSP_CLICKS_SCHEMA = 'dwh'
- from __init__ import PROJECT_PATH
- def load_new_shows_data(date_day: str) -> pd.DataFrame:
- """
- Загружает и агрегирует данные по показам (shows) из ClickHouse,
- используя запрос из файла ch_queries.py.
- """
- query = get_new_shows_query(date_day)
- logger.info("Выполняется запрос для загрузки новых данных по показам (shows)")
- shows_df = get_from_ch_using_pd(query)
- return shows_df
- def load_new_clicks_data(date_day: str) -> pd.DataFrame:
- """
- Загружает и агрегирует данные по кликам (clicks) из ClickHouse,
- используя запрос из файла ch_queries.py.
- """
- query = get_new_clicks_query(date_day)
- logger.info("Выполняется запрос для загрузки новых данных по кликам (clicks)")
- clicks_df = get_from_ch_using_pd(query)
- return clicks_df
- async def main(date_str: str):
- # Очищаем целевые таблицы в Postgres
- logger.info(f"Очищаем таблицы {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE} и {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
- pg_engine.execute(f"TRUNCATE {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE}")
- pg_engine.execute(f"TRUNCATE {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
- # Загрузка новых данных по показам и кликам за заданный день
- shows_df = load_new_shows_data(date_str)
- logger.info(f"Загружено {shows_df.shape[0]} строк по показам (shows) за {date_str}")
- clicks_df = load_new_clicks_data(date_str)
- logger.info(f"Загружено {clicks_df.shape[0]} строк по кликам (clicks) за {date_str}")
- # Загрузка агрегированных данных в целевые таблицы Postgres
- db_load(pg_engine, shows_df, day=date_str, schema=NEW_DSP_SHOWS_SCHEMA, table=NEW_DSP_SHOWS_TABLE, method=insert_copy)
- logger.info("Данные по показам успешно загружены в Postgres")
- db_load(pg_engine, clicks_df, day=date_str, schema=NEW_DSP_CLICKS_SCHEMA, table=NEW_DSP_CLICKS_TABLE, method=insert_copy)
- logger.info("Данные по кликам успешно загружены в Postgres")
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--date', type=lambda s: datetime.strptime(s, '%Y-%m-%d').date(), required=True,
- help="Дата в формате YYYY-MM-DD")
- args = parser.parse_args()
- asyncio.run(main(args.date.strftime('%Y-%m-%d')))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement