Advertisement
sesquiipedalian

Untitled

Nov 17th, 2024
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.37 KB | None | 0 0
  1. from datetime import datetime, timedelta
  2. from airflow import DAG
  3. from airflow.operators.python_operator import PythonOperator
  4. from airflow.models import Variable
  5. import pandas as pd
  6. from clickhouse_driver import Client
  7. import requests
  8. import traceback
  9.  
  10.  
  11. # Основная функция для выполнения запроса к ClickHouse
  12. def query_clickhouse():
  13.     # Получение переменных из Airflow
  14.     user = Variable.get("user_tns_moderation")
  15.     password = Variable.get("pass_tns_moderation")
  16.     host = 'antifraud-prod-bm-ch09.el.wb.ru'
  17.     port = '9000'
  18.     client = Client(host=host, port=port, user=user, password=password)
  19.  
  20.     # Ваш SQL-запрос
  21.     query = """
  22.    WITH
  23.  
  24.    -- Исторические данные: за последний месяц минус последние 4 часа
  25.    historical_data AS (
  26.        SELECT
  27.            SrId,
  28.            UserId,
  29.            OrderUId,
  30.            CreatedDateTime,
  31.            UpdatedDateTime,
  32.            PaymentType,
  33.            GoodsName,
  34.            SupplierId,
  35.            NmId,
  36.            StatusId,
  37.            IsPaid
  38.        FROM datalake.OrdersFromOrdo
  39.        WHERE
  40.            NOT empty(PaymentType)
  41.            AND StatusId != 4 -- исключаем статус с багом TNS-646
  42.            AND PaymentType IN ('WPG', 'QRC', 'QRS', 'MPM', 'CSH', 'WAI', 'CSP', 'CRD',
  43.                                'CRE', 'CRS', 'CIN', 'IWB', 'BAL', 'APG', 'GPG', 'ACC',
  44.                                'PWL', 'CWL', 'BPC', 'PDL', 'CWB')
  45.            AND NmId NOT IN (157080896, 16020241, 109108940, 209785740, 107004643)
  46.            AND UpdatedDateTime >= toDate(subtractMonths(subtractHours(now(), 4), 1))
  47.            AND UpdatedDateTime < toDate(subtractHours(now(), 4))
  48.            AND CreatedDateTime >= toDate(subtractMonths(subtractHours(now(), 4), 1))
  49.            AND CreatedDateTime < toDate(subtractHours(now(), 4))
  50.    ),
  51.  
  52.    -- Недавние данные: последние 4 часа
  53.    recent_data AS (
  54.        SELECT
  55.            SrId,
  56.            UserId,
  57.            OrderUId,
  58.            CreatedDateTime,
  59.            UpdatedDateTime,
  60.            PaymentType,
  61.            GoodsName,
  62.            SupplierId,
  63.            NmId,
  64.            StatusId,
  65.            IsPaid
  66.        FROM datalake.OrdersFromOrdo
  67.        WHERE
  68.            NOT empty(PaymentType)
  69.            AND StatusId != 4 -- исключаем статус с багом TNS-646
  70.            AND PaymentType IN ('WPG', 'QRC', 'QRS', 'MPM', 'CSH', 'WAI', 'CSP', 'CRD',
  71.                                'CRE', 'CRS', 'CIN', 'IWB', 'BAL', 'APG', 'GPG', 'ACC',
  72.                                'PWL', 'CWL', 'BPC', 'PDL', 'CWB')
  73.            AND NmId NOT IN (157080896, 16020241, 109108940, 209785740, 107004643)
  74.            AND UpdatedDateTime >= subtractHours(now(), 4)
  75.            AND CreatedDateTime >= subtractHours(now(), 4)
  76.    ),
  77.  
  78.    -- Продавцы, купившие тариф
  79.    users_with_tariff AS (
  80.        SELECT DISTINCT toInt64(userID) AS UserId
  81.        FROM datalake.SellerTariffOptions
  82.        WHERE slug = 'protectionAgainstDistantOrders'
  83.    ),
  84.  
  85.    -- Продаваемость на исторических данных
  86.    historical_metrics AS (
  87.        SELECT
  88.            rd.NmId,
  89.            rd.GoodsName,
  90.            rd.SupplierId,
  91.            COUNT(DISTINCT rd.SrId) AS total_receipts,
  92.            COUNT(DISTINCT rd.OrderUId) AS total_orders,
  93.            (COUNT(DISTINCT rd.SrId) / COUNT(DISTINCT rd.OrderUId)) AS saleability
  94.        FROM historical_data rd
  95.        INNER JOIN users_with_tariff uwt ON toInt64(rd.UserId) = uwt.UserId
  96.        GROUP BY
  97.            rd.NmId,
  98.            rd.GoodsName,
  99.            rd.SupplierId
  100.    ),
  101.  
  102.    -- Метрики на исторических данных
  103.    metrics_stats AS (
  104.        SELECT
  105.            NmId,
  106.            GoodsName,
  107.            SupplierId,
  108.            avg(saleability) AS mean_saleability,
  109.            stddevPop(saleability) AS std_saleability
  110.        FROM historical_metrics
  111.        GROUP BY
  112.            NmId,
  113.            GoodsName,
  114.            SupplierId
  115.    ),
  116.  
  117.    -- Продаваемость на недавних данных
  118.    recent_saleability AS (
  119.        SELECT
  120.            rd.NmId,
  121.            rd.GoodsName,
  122.            rd.SupplierId,
  123.            rd.UserId,
  124.            rd.OrderUId,
  125.            any(rd.CreatedDateTime) as CreatedDateTime,
  126.            COUNT(DISTINCT rd.SrId) AS total_ordered
  127.        FROM recent_data rd
  128.        INNER JOIN users_with_tariff uwt ON toInt64(rd.UserId) = uwt.UserId
  129.        GROUP BY
  130.            rd.NmId,
  131.            rd.GoodsName,
  132.            rd.SupplierId,
  133.            rd.UserId,
  134.            rd.OrderUId,
  135.            rd.CreatedDateTime
  136.    )
  137.  
  138.    -- Принтим аномалии на основе трех отклонений
  139.    SELECT
  140.        rs.UserId,
  141.        rs.OrderUId,
  142.        rs.CreatedDateTime,
  143.        rs.NmId,
  144.        rs.GoodsName,
  145.        rs.SupplierId,
  146.        1 AS is_tariff,
  147.        rs.total_ordered,
  148.        ms.mean_saleability,
  149.        ms.std_saleability,
  150.        (ms.mean_saleability + 3 * ms.std_saleability) AS threshold
  151.    FROM recent_saleability rs
  152.    JOIN metrics_stats ms
  153.        ON rs.NmId = ms.NmId
  154.        AND rs.SupplierId = ms.SupplierId
  155.    WHERE 1=1
  156.        AND rs.total_ordered > (ms.mean_saleability + 3 * ms.std_saleability)
  157.        AND ms.mean_saleability + 3 * ms.std_saleability > 4
  158.    ORDER BY rs.NmId
  159.    LIMIT 500
  160.    SETTINGS max_memory_usage = '999G';
  161.    """
  162.  
  163.     result = client.execute(query)
  164.     df = pd.DataFrame(result)
  165.  
  166.     if df.empty:
  167.         return None
  168.  
  169.     df.columns = [
  170.         'user_id', 'order_uid', 'created_date', 'nm_id',
  171.         'goods_name', 'supplier_name', 'is_tariff',
  172.         'total_ordered', 'mean_month',
  173.         'std_month', 'threshold'
  174.     ]
  175.  
  176.     return df
  177.  
  178.  
  179. # Создаётся сообщение
  180. def create_message(df):
  181.     header = "⚠️ Обнаружены аномалии в продажах заказов за последние 4 часа"
  182.     table = df.to_string(index=False)
  183.     full_message = f"{header}\n\n{table}"
  184.     return full_message
  185.  
  186.  
  187. # Отправляю сообщение в Mattermost
  188. def send_message(text):
  189.     MATTERMOST_URL = 'https://band.wb.ru/api/v4'
  190.     BOT_TOKEN = Variable.get("analytics_alerts_bot_tkn")
  191.     CHANNEL_ID = "1dpuxh841f898mpaoqq6bwgd1y" # тестовый канал
  192.  
  193.     headers = {
  194.         'Authorization': f'Bearer {BOT_TOKEN}',
  195.         'Content-Type': 'application/json',
  196.     }
  197.  
  198.     url = f'{MATTERMOST_URL}/posts'
  199.     payload = {
  200.         'channel_id': CHANNEL_ID,
  201.         'message': text
  202.     }
  203.     response = requests.post(url, json=payload, headers=headers)
  204.     if response.status_code != 201:
  205.         raise Exception(f"Error to send message: {response.text}")
  206.  
  207.  
  208. # Алерт на случай ошибки
  209. def send_alert(context) -> None:
  210.     MATTERMOST_URL = 'https://band.wb.ru/api/v4'
  211.     BOT_TOKEN = Variable.get("analytics_alerts_bot_tkn")
  212.     CHANNEL_ID = "1dpuxh841f898mpaoqq6bwgd1y"  # тестовый канал
  213.  
  214.     headers = {
  215.         'Authorization': f'Bearer {BOT_TOKEN}',
  216.         'Content-Type': 'application/json',
  217.     }
  218.  
  219.     error_message = f"Error in DAG sharp_orders_monitoring: {context.get('exception')}"
  220.  
  221.     url = f'{MATTERMOST_URL}/posts'
  222.     payload = {
  223.         'channel_id': CHANNEL_ID,
  224.         'message': error_message
  225.     }
  226.     requests.post(url, json=payload, headers=headers)
  227.  
  228.  
  229. # Основная задача DAG
  230. def main_task():
  231.     df = query_clickhouse()
  232.     if df is not None:
  233.         message = create_message(df)
  234.         send_message(message)
  235.     else:
  236.         print("No anomalies found.")
  237.  
  238.  
  239.  
  240. # Аргументы для DAG
  241. default_args = {
  242.     'owner': 'kemal.azamat',
  243.     'start_date': datetime(2024, 11, 18, 12, 00),
  244.     'retries': 1,
  245.     'retry_delay': timedelta(minutes=5),
  246.     'on_failure_callback': send_alert
  247. }
  248.  
  249. # Определяем сам DAG
  250. dag = DAG(
  251.     'sharp_orders_monitoring',
  252.     default_args=default_args,
  253.     description='Sharp orders monitoring',
  254.     schedule_interval=timedelta(hours=4),
  255.     catchup=False
  256. )
  257.  
  258. # Определение задачи для выполнения DAG
  259. run_sharp_monitoring = PythonOperator(
  260.     task_id='main_task',
  261.     python_callable=main_task,
  262.     dag=dag,
  263. )
  264.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement