Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- from pathlib import Path
- import pandas as pd
- import asyncio
- from datetime import datetime
- import logging
- # Импорт утилит для работы с БД и загрузки SQL-файлов (предполагается, что они уже реализованы)
- from utility.db_tools.async_from_ch_to_postgres import async_load_from_ch_to_postgres, async_click_con
- from utility.db_tools.from_ch_to_postgres import load_from_ch_to_postgres, psql_conn
- from utility.db_tools.postgres_tools.load import db_load, insert_copy, pg_engine
- from utility.db_tools.load_sql_file import load_sql_file
- from utility.api_connector_tools import valid_date
- from video_utils import get_from_ch, get_from_ch_using_pd, get_from_pg_using_pd
- logger = logging.getLogger('new_pipeline')
- # Названия и схемы целевых таблиц в Postgres (можно адаптировать под нужды)
- NEW_DSP_SHOWS_TABLE = 'new_video_dsp_shows'
- NEW_DSP_CLICKS_TABLE = 'new_video_dsp_clicks'
- NEW_DSP_SHOWS_SCHEMA = 'dwh'
- NEW_DSP_CLICKS_SCHEMA = 'dwh'
- # Если используется константа пути к проекту, например:
- from __init__ import PROJECT_PATH
- def load_new_shows_data(date_day: str) -> pd.DataFrame:
- """
- Загрузка и агрегация данных по показам (shows) из новых таблиц.
- Здесь:
- - dt заменяет show_date
- - request_ts_ms и event_ts_ms используются вместо show_ts
- - Отсутствующие поля (stat_id, segments и т.п.) заменяются заглушками (0 или пустыми значениями)
- """
- new_shows_query = f"""
- -- Подзапрос для данных из unified.mdsp_request_cloud (информация о запросе)
- WITH req AS (
- SELECT
- dt AS date_day,
- bid_request_id,
- site_id AS pad_id,
- tag_id AS block_id,
- 0 AS adplace_id, -- Заглушка, т.к. stat_id нет в новых таблицах
- 0 AS statid, -- Заглушка
- 0 AS test_id, -- Заглушка
- -- Если regions не пустой, проверяем первый элемент, иначе возвращаем 0 (False)
- if(arrayExists(x -> x = 1, regions), toUInt8(regions[1] == 1), 0) AS fl_russia
- FROM unified.mdsp_request_cloud
- WHERE dt = toDate('{date_day}')
- -- Дополнительно можно добавить фильтр по часу (toHour(request_ts_ms))
- ),
- -- Подзапрос для данных из unified.mdsp_event_cloud (информация о событии)
- evt AS (
- SELECT
- dt AS date_day,
- bid_request_id,
- request_site_id AS pad_id,
- request_tag_id AS block_id,
- campaign_id,
- -- Пример получения категории через dictGet, если она доступна
- dictGetString('DSP_ProductCategory', 'name',
- dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id))
- ) AS category
- FROM unified.mdsp_event_cloud
- WHERE dt = toDate('{date_day}')
- AND event_type = 10 -- Предположим, что event_type = 10 соответствует показам (shows)
- )
- SELECT
- r.date_day,
- 429675720 AS user_id, -- Жестко заданное значение user_id
- r.pad_id,
- r.block_id,
- e.category,
- r.adplace_id,
- r.statid,
- r.test_id,
- e.campaign_id,
- r.fl_russia,
- count(*) AS real_shows,
- 0 AS revenue -- Заглушка для revenue, если нет соответствующих данных
- FROM evt e
- JOIN req r
- ON e.bid_request_id = r.bid_request_id
- AND e.pad_id = r.pad_id
- AND e.block_id = r.block_id
- GROUP BY
- r.date_day,
- e.campaign_id,
- r.pad_id,
- r.block_id,
- e.category,
- r.adplace_id,
- r.statid,
- r.test_id,
- r.fl_russia
- """
- logger.info("Выполняется запрос для загрузки новых данных по показам (shows)")
- shows_df = get_from_ch_using_pd(new_shows_query)
- return shows_df
- def load_new_clicks_data(date_day: str) -> pd.DataFrame:
- """
- Загрузка и агрегация данных по кликам (clicks) из новых таблиц.
- Здесь:
- - dt используется вместо click_date
- - Используем unified.mdsp_request_cloud и unified.mdsp_event_cloud
- - Поля stat_id, crosstraffic_id и т.п. заменяются заглушками
- """
- new_clicks_query = f"""
- WITH req AS (
- SELECT
- dt AS date_day,
- bid_request_id,
- site_id AS pad_id,
- tag_id AS block_id,
- 0 AS adplace_id, -- Заглушка
- 0 AS statid, -- Заглушка
- 0 AS test_id, -- Заглушка
- 0 AS crosstraffic_id, -- Заглушка
- if(arrayExists(x -> x = 1, regions), toUInt8(regions[1] == 1), 0) AS fl_russia
- FROM unified.mdsp_request_cloud
- WHERE dt = toDate('{date_day}')
- ),
- evt AS (
- SELECT
- dt AS date_day,
- bid_request_id,
- request_site_id AS pad_id,
- request_tag_id AS block_id,
- campaign_id,
- ad_group_id AS dsp_flight_id,
- dictGetString('DSP_ProductCategory', 'name',
- dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id))
- ) AS category,
- dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id)) AS product_category_id,
- dictGetUInt64('ATD_Campaign', 'income_source_id',
- dictGetUInt64('ATD_DspCampaign', 'atd_oid', campaign_id)
- ) AS income_source_id
- FROM unified.mdsp_event_cloud
- WHERE dt = toDate('{date_day}')
- AND event_type = 6 -- Предположим, что event_type = 6 соответствует кликам
- )
- SELECT
- r.date_day,
- 429675720 AS user_id,
- e.campaign_id,
- r.pad_id,
- e.dsp_flight_id,
- r.block_id,
- e.category,
- r.adplace_id,
- r.statid,
- r.test_id,
- r.crosstraffic_id,
- r.fl_russia,
- e.product_category_id,
- e.income_source_id,
- count(*) AS clicks
- FROM req r
- JOIN evt e
- ON r.bid_request_id = e.bid_request_id
- AND r.pad_id = e.pad_id
- AND r.block_id = e.block_id
- GROUP BY
- r.date_day,
- e.campaign_id,
- r.pad_id,
- e.dsp_flight_id,
- r.block_id,
- e.category,
- r.adplace_id,
- r.statid,
- r.test_id,
- r.crosstraffic_id,
- r.fl_russia,
- e.product_category_id,
- e.income_source_id
- """
- logger.info("Выполняется запрос для загрузки новых данных по кликам (clicks)")
- clicks_df = get_from_ch_using_pd(new_clicks_query)
- return clicks_df
- async def main(date_str: str):
- # Очищаем целевые таблицы в Postgres
- logger.info(f"Очищаем таблицы {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE} и {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
- pg_engine.execute(f"TRUNCATE {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE}")
- pg_engine.execute(f"TRUNCATE {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
- # Загрузка новых данных по показам и кликам за заданный день
- shows_df = load_new_shows_data(date_str)
- logger.info(f"Загружено {shows_df.shape[0]} строк по показам (shows) за {date_str}")
- clicks_df = load_new_clicks_data(date_str)
- logger.info(f"Загружено {clicks_df.shape[0]} строк по кликам (clicks) за {date_str}")
- # Загрузка агрегированных данных в целевые таблицы Postgres
- db_load(pg_engine, shows_df, day=date_str, schema=NEW_DSP_SHOWS_SCHEMA, table=NEW_DSP_SHOWS_TABLE, method=insert_copy)
- logger.info("Данные по показам успешно загружены в Postgres")
- db_load(pg_engine, clicks_df, day=date_str, schema=NEW_DSP_CLICKS_SCHEMA, table=NEW_DSP_CLICKS_TABLE, method=insert_copy)
- logger.info("Данные по кликам успешно загружены в Postgres")
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--date', type=lambda s: datetime.strptime(s, '%Y-%m-%d').date(), required=True,
- help="Дата в формате YYYY-MM-DD")
- args = parser.parse_args()
- # Запускаем асинхронный пайплайн
- asyncio.run(main(args.date.strftime('%Y-%m-%d')))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement