Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime, timedelta
- from airflow import DAG
- from airflow.operators.python_operator import PythonOperator
- from airflow.models import Variable
- import pandas as pd
- from clickhouse_driver import Client
- import requests
- import traceback
- # Основная функция для выполнения запроса к ClickHouse
- def query_clickhouse():
- # Получение переменных из Airflow
- user = Variable.get("user_tns_moderation")
- password = Variable.get("pass_tns_moderation")
- host = 'antifraud-prod-bm-ch09.el.wb.ru'
- port = '9000'
- client = Client(host=host, port=port, user=user, password=password)
- # Ваш SQL-запрос
- query = """
- WITH
- -- Исторические данные: за последний месяц минус последние 4 часа
- historical_data AS (
- SELECT
- SrId,
- UserId,
- OrderUId,
- CreatedDateTime,
- UpdatedDateTime,
- PaymentType,
- GoodsName,
- SupplierId,
- NmId,
- StatusId,
- IsPaid
- FROM datalake.OrdersFromOrdo
- WHERE
- NOT empty(PaymentType)
- AND StatusId != 4 -- исключаем статус с багом TNS-646
- AND PaymentType IN ('WPG', 'QRC', 'QRS', 'MPM', 'CSH', 'WAI', 'CSP', 'CRD',
- 'CRE', 'CRS', 'CIN', 'IWB', 'BAL', 'APG', 'GPG', 'ACC',
- 'PWL', 'CWL', 'BPC', 'PDL', 'CWB')
- AND NmId NOT IN (157080896, 16020241, 109108940, 209785740, 107004643)
- AND UpdatedDateTime >= toDate(subtractMonths(subtractHours(now(), 4), 1))
- AND UpdatedDateTime < toDate(subtractHours(now(), 4))
- AND CreatedDateTime >= toDate(subtractMonths(subtractHours(now(), 4), 1))
- AND CreatedDateTime < toDate(subtractHours(now(), 4))
- ),
- -- Недавние данные: последние 4 часа
- recent_data AS (
- SELECT
- SrId,
- UserId,
- OrderUId,
- CreatedDateTime,
- UpdatedDateTime,
- PaymentType,
- GoodsName,
- SupplierId,
- NmId,
- StatusId,
- IsPaid
- FROM datalake.OrdersFromOrdo
- WHERE
- NOT empty(PaymentType)
- AND StatusId != 4 -- исключаем статус с багом TNS-646
- AND PaymentType IN ('WPG', 'QRC', 'QRS', 'MPM', 'CSH', 'WAI', 'CSP', 'CRD',
- 'CRE', 'CRS', 'CIN', 'IWB', 'BAL', 'APG', 'GPG', 'ACC',
- 'PWL', 'CWL', 'BPC', 'PDL', 'CWB')
- AND NmId NOT IN (157080896, 16020241, 109108940, 209785740, 107004643)
- AND UpdatedDateTime >= subtractHours(now(), 4)
- AND CreatedDateTime >= subtractHours(now(), 4)
- ),
- -- Продавцы, купившие тариф
- users_with_tariff AS (
- SELECT DISTINCT toInt64(userID) AS UserId
- FROM datalake.SellerTariffOptions
- WHERE slug = 'protectionAgainstDistantOrders'
- ),
- -- Продаваемость на исторических данных
- historical_metrics AS (
- SELECT
- rd.NmId,
- rd.GoodsName,
- rd.SupplierId,
- COUNT(DISTINCT rd.SrId) AS total_receipts,
- COUNT(DISTINCT rd.OrderUId) AS total_orders,
- (COUNT(DISTINCT rd.SrId) / COUNT(DISTINCT rd.OrderUId)) AS saleability
- FROM historical_data rd
- INNER JOIN users_with_tariff uwt ON toInt64(rd.UserId) = uwt.UserId
- GROUP BY
- rd.NmId,
- rd.GoodsName,
- rd.SupplierId
- ),
- -- Метрики на исторических данных
- metrics_stats AS (
- SELECT
- NmId,
- GoodsName,
- SupplierId,
- avg(saleability) AS mean_saleability,
- stddevPop(saleability) AS std_saleability
- FROM historical_metrics
- GROUP BY
- NmId,
- GoodsName,
- SupplierId
- ),
- -- Продаваемость на недавних данных
- recent_saleability AS (
- SELECT
- rd.NmId,
- rd.GoodsName,
- rd.SupplierId,
- rd.UserId,
- rd.OrderUId,
- any(rd.CreatedDateTime) as CreatedDateTime,
- COUNT(DISTINCT rd.SrId) AS total_ordered
- FROM recent_data rd
- INNER JOIN users_with_tariff uwt ON toInt64(rd.UserId) = uwt.UserId
- GROUP BY
- rd.NmId,
- rd.GoodsName,
- rd.SupplierId,
- rd.UserId,
- rd.OrderUId,
- rd.CreatedDateTime
- )
- -- Принтим аномалии на основе трех отклонений
- SELECT
- rs.UserId,
- rs.OrderUId,
- rs.CreatedDateTime,
- rs.NmId,
- rs.GoodsName,
- rs.SupplierId,
- 1 AS is_tariff,
- rs.total_ordered,
- ms.mean_saleability,
- ms.std_saleability,
- (ms.mean_saleability + 3 * ms.std_saleability) AS threshold
- FROM recent_saleability rs
- JOIN metrics_stats ms
- ON rs.NmId = ms.NmId
- AND rs.SupplierId = ms.SupplierId
- WHERE 1=1
- AND rs.total_ordered > (ms.mean_saleability + 3 * ms.std_saleability)
- AND ms.mean_saleability + 3 * ms.std_saleability > 4
- ORDER BY rs.NmId
- LIMIT 500
- SETTINGS max_memory_usage = '999G';
- """
- result = client.execute(query)
- df = pd.DataFrame(result)
- if df.empty:
- return None
- df.columns = [
- 'user_id', 'order_uid', 'created_date', 'nm_id',
- 'goods_name', 'supplier_name', 'is_tariff',
- 'total_ordered', 'mean_month',
- 'std_month', 'threshold'
- ]
- return df
- # Создаётся сообщение
- def create_message(df):
- header = "⚠️ Обнаружены аномалии в продажах заказов за последние 4 часа"
- table = df.to_string(index=False)
- full_message = f"{header}\n\n{table}"
- return full_message
- # Отправляю сообщение в Mattermost
- def send_message(text):
- MATTERMOST_URL = 'https://band.wb.ru/api/v4'
- BOT_TOKEN = Variable.get("analytics_alerts_bot_tkn")
- CHANNEL_ID = "1dpuxh841f898mpaoqq6bwgd1y" # тестовый канал
- headers = {
- 'Authorization': f'Bearer {BOT_TOKEN}',
- 'Content-Type': 'application/json',
- }
- url = f'{MATTERMOST_URL}/posts'
- payload = {
- 'channel_id': CHANNEL_ID,
- 'message': text
- }
- response = requests.post(url, json=payload, headers=headers)
- if response.status_code != 201:
- raise Exception(f"Error to send message: {response.text}")
- # Алерт на случай ошибки
- def send_alert(context) -> None:
- MATTERMOST_URL = 'https://band.wb.ru/api/v4'
- BOT_TOKEN = Variable.get("analytics_alerts_bot_tkn")
- CHANNEL_ID = "1dpuxh841f898mpaoqq6bwgd1y" # тестовый канал
- headers = {
- 'Authorization': f'Bearer {BOT_TOKEN}',
- 'Content-Type': 'application/json',
- }
- error_message = f"Error in DAG sharp_orders_monitoring: {context.get('exception')}"
- url = f'{MATTERMOST_URL}/posts'
- payload = {
- 'channel_id': CHANNEL_ID,
- 'message': error_message
- }
- requests.post(url, json=payload, headers=headers)
- # Основная задача DAG
- def main_task():
- df = query_clickhouse()
- if df is not None:
- message = create_message(df)
- send_message(message)
- else:
- print("No anomalies found.")
- # Аргументы для DAG
- default_args = {
- 'owner': 'kemal.azamat',
- 'start_date': datetime(2024, 11, 18, 12, 00),
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- 'on_failure_callback': send_alert
- }
- # Определяем сам DAG
- dag = DAG(
- 'sharp_orders_monitoring',
- default_args=default_args,
- description='Sharp orders monitoring',
- schedule_interval=timedelta(hours=4),
- catchup=False
- )
- # Определение задачи для выполнения DAG
- run_sharp_monitoring = PythonOperator(
- task_id='main_task',
- python_callable=main_task,
- dag=dag,
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement