Advertisement
gubichas

Untitled

Mar 12th, 2025
107
0
18 hours
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.14 KB | None | 0 0
  1. import argparse
  2. from pathlib import Path
  3. import pandas as pd
  4. import asyncio
  5. from datetime import datetime
  6. import logging
  7.  
  8. # Импорт утилит для работы с БД и загрузки SQL-файлов (предполагается, что они уже реализованы)
  9. from utility.db_tools.async_from_ch_to_postgres import async_load_from_ch_to_postgres, async_click_con
  10. from utility.db_tools.from_ch_to_postgres import load_from_ch_to_postgres, psql_conn
  11. from utility.db_tools.postgres_tools.load import db_load, insert_copy, pg_engine
  12. from utility.db_tools.load_sql_file import load_sql_file
  13. from utility.api_connector_tools import valid_date
  14. from video_utils import get_from_ch, get_from_ch_using_pd, get_from_pg_using_pd
  15.  
  16. logger = logging.getLogger('new_pipeline')
  17.  
  18. # Названия и схемы целевых таблиц в Postgres (можно адаптировать под нужды)
  19. NEW_DSP_SHOWS_TABLE = 'new_video_dsp_shows'
  20. NEW_DSP_CLICKS_TABLE = 'new_video_dsp_clicks'
  21. NEW_DSP_SHOWS_SCHEMA = 'dwh'
  22. NEW_DSP_CLICKS_SCHEMA = 'dwh'
  23.  
  24. # Если используется константа пути к проекту, например:
  25. from __init__ import PROJECT_PATH
  26.  
  27. def load_new_shows_data(date_day: str) -> pd.DataFrame:
  28.     """
  29.    Загрузка и агрегация данных по показам (shows) из новых таблиц.
  30.    Здесь:
  31.      - dt заменяет show_date
  32.      - request_ts_ms и event_ts_ms используются вместо show_ts
  33.      - Отсутствующие поля (stat_id, segments и т.п.) заменяются заглушками (0 или пустыми значениями)
  34.    """
  35.     new_shows_query = f"""
  36.    -- Подзапрос для данных из unified.mdsp_request_cloud (информация о запросе)
  37.    WITH req AS (
  38.        SELECT
  39.            dt AS date_day,
  40.            bid_request_id,
  41.            site_id AS pad_id,
  42.            tag_id AS block_id,
  43.            0 AS adplace_id,         -- Заглушка, т.к. stat_id нет в новых таблицах
  44.            0 AS statid,             -- Заглушка
  45.            0 AS test_id,            -- Заглушка
  46.            -- Если regions не пустой, проверяем первый элемент, иначе возвращаем 0 (False)
  47.            if(arrayExists(x -> x = 1, regions), toUInt8(regions[1] == 1), 0) AS fl_russia
  48.        FROM unified.mdsp_request_cloud
  49.        WHERE dt = toDate('{date_day}')
  50.        -- Дополнительно можно добавить фильтр по часу (toHour(request_ts_ms))
  51.    ),
  52.    -- Подзапрос для данных из unified.mdsp_event_cloud (информация о событии)
  53.    evt AS (
  54.        SELECT
  55.            dt AS date_day,
  56.            bid_request_id,
  57.            request_site_id AS pad_id,
  58.            request_tag_id AS block_id,
  59.            campaign_id,
  60.            -- Пример получения категории через dictGet, если она доступна
  61.            dictGetString('DSP_ProductCategory', 'name',
  62.                dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id))
  63.            ) AS category
  64.        FROM unified.mdsp_event_cloud
  65.        WHERE dt = toDate('{date_day}')
  66.          AND event_type = 10  -- Предположим, что event_type = 10 соответствует показам (shows)
  67.    )
  68.    SELECT
  69.        r.date_day,
  70.        429675720 AS user_id,  -- Жестко заданное значение user_id
  71.        r.pad_id,
  72.        r.block_id,
  73.        e.category,
  74.        r.adplace_id,
  75.        r.statid,
  76.        r.test_id,
  77.        e.campaign_id,
  78.        r.fl_russia,
  79.        count(*) AS real_shows,
  80.        0 AS revenue  -- Заглушка для revenue, если нет соответствующих данных
  81.    FROM evt e
  82.    JOIN req r
  83.      ON e.bid_request_id = r.bid_request_id
  84.     AND e.pad_id = r.pad_id
  85.     AND e.block_id = r.block_id
  86.    GROUP BY
  87.        r.date_day,
  88.        e.campaign_id,
  89.        r.pad_id,
  90.        r.block_id,
  91.        e.category,
  92.        r.adplace_id,
  93.        r.statid,
  94.        r.test_id,
  95.        r.fl_russia
  96.    """
  97.     logger.info("Выполняется запрос для загрузки новых данных по показам (shows)")
  98.     shows_df = get_from_ch_using_pd(new_shows_query)
  99.     return shows_df
  100.  
  101. def load_new_clicks_data(date_day: str) -> pd.DataFrame:
  102.     """
  103.    Загрузка и агрегация данных по кликам (clicks) из новых таблиц.
  104.    Здесь:
  105.      - dt используется вместо click_date
  106.      - Используем unified.mdsp_request_cloud и unified.mdsp_event_cloud
  107.      - Поля stat_id, crosstraffic_id и т.п. заменяются заглушками
  108.    """
  109.     new_clicks_query = f"""
  110.    WITH req AS (
  111.        SELECT
  112.            dt AS date_day,
  113.            bid_request_id,
  114.            site_id AS pad_id,
  115.            tag_id AS block_id,
  116.            0 AS adplace_id,         -- Заглушка
  117.            0 AS statid,             -- Заглушка
  118.            0 AS test_id,            -- Заглушка
  119.            0 AS crosstraffic_id,    -- Заглушка
  120.            if(arrayExists(x -> x = 1, regions), toUInt8(regions[1] == 1), 0) AS fl_russia
  121.        FROM unified.mdsp_request_cloud
  122.        WHERE dt = toDate('{date_day}')
  123.    ),
  124.    evt AS (
  125.        SELECT
  126.            dt AS date_day,
  127.            bid_request_id,
  128.            request_site_id AS pad_id,
  129.            request_tag_id AS block_id,
  130.            campaign_id,
  131.            ad_group_id AS dsp_flight_id,
  132.            dictGetString('DSP_ProductCategory', 'name',
  133.                dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id))
  134.            ) AS category,
  135.            dictGetUInt64('DSP_Flight', 'product_category_id', toUInt64(ad_group_id)) AS product_category_id,
  136.            dictGetUInt64('ATD_Campaign', 'income_source_id',
  137.                dictGetUInt64('ATD_DspCampaign', 'atd_oid', campaign_id)
  138.            ) AS income_source_id
  139.        FROM unified.mdsp_event_cloud
  140.        WHERE dt = toDate('{date_day}')
  141.          AND event_type = 6  -- Предположим, что event_type = 6 соответствует кликам
  142.    )
  143.    SELECT
  144.        r.date_day,
  145.        429675720 AS user_id,
  146.        e.campaign_id,
  147.        r.pad_id,
  148.        e.dsp_flight_id,
  149.        r.block_id,
  150.        e.category,
  151.        r.adplace_id,
  152.        r.statid,
  153.        r.test_id,
  154.        r.crosstraffic_id,
  155.        r.fl_russia,
  156.        e.product_category_id,
  157.        e.income_source_id,
  158.        count(*) AS clicks
  159.    FROM req r
  160.    JOIN evt e
  161.      ON r.bid_request_id = e.bid_request_id
  162.     AND r.pad_id = e.pad_id
  163.     AND r.block_id = e.block_id
  164.    GROUP BY
  165.        r.date_day,
  166.        e.campaign_id,
  167.        r.pad_id,
  168.        e.dsp_flight_id,
  169.        r.block_id,
  170.        e.category,
  171.        r.adplace_id,
  172.        r.statid,
  173.        r.test_id,
  174.        r.crosstraffic_id,
  175.        r.fl_russia,
  176.        e.product_category_id,
  177.        e.income_source_id
  178.    """
  179.     logger.info("Выполняется запрос для загрузки новых данных по кликам (clicks)")
  180.     clicks_df = get_from_ch_using_pd(new_clicks_query)
  181.     return clicks_df
  182.  
  183. async def main(date_str: str):
  184.     # Очищаем целевые таблицы в Postgres
  185.     logger.info(f"Очищаем таблицы {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE} и {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
  186.     pg_engine.execute(f"TRUNCATE {NEW_DSP_CLICKS_SCHEMA}.{NEW_DSP_CLICKS_TABLE}")
  187.     pg_engine.execute(f"TRUNCATE {NEW_DSP_SHOWS_SCHEMA}.{NEW_DSP_SHOWS_TABLE}")
  188.  
  189.     # Загрузка новых данных по показам и кликам за заданный день
  190.     shows_df = load_new_shows_data(date_str)
  191.     logger.info(f"Загружено {shows_df.shape[0]} строк по показам (shows) за {date_str}")
  192.  
  193.     clicks_df = load_new_clicks_data(date_str)
  194.     logger.info(f"Загружено {clicks_df.shape[0]} строк по кликам (clicks) за {date_str}")
  195.  
  196.     # Загрузка агрегированных данных в целевые таблицы Postgres
  197.     db_load(pg_engine, shows_df, day=date_str, schema=NEW_DSP_SHOWS_SCHEMA, table=NEW_DSP_SHOWS_TABLE, method=insert_copy)
  198.     logger.info("Данные по показам успешно загружены в Postgres")
  199.  
  200.     db_load(pg_engine, clicks_df, day=date_str, schema=NEW_DSP_CLICKS_SCHEMA, table=NEW_DSP_CLICKS_TABLE, method=insert_copy)
  201.     logger.info("Данные по кликам успешно загружены в Postgres")
  202.  
  203. if __name__ == '__main__':
  204.     parser = argparse.ArgumentParser()
  205.     parser.add_argument('--date', type=lambda s: datetime.strptime(s, '%Y-%m-%d').date(), required=True,
  206.                         help="Дата в формате YYYY-MM-DD")
  207.     args = parser.parse_args()
  208.     # Запускаем асинхронный пайплайн
  209.     asyncio.run(main(args.date.strftime('%Y-%m-%d')))
  210.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement