Advertisement
vishneva_olga

Untitled

Apr 6th, 2025
293
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.54 KB | None | 0 0
  1. #plugins/steps/churn.py
  2.  
  3. #from tkinter.messagebox import NO
  4. from airflow.providers.postgres.hooks.postgres import PostgresHook
  5. import pandas as pd
  6. import numpy as np
  7.  
  8. def create_table() -> None:
  9.         import sqlalchemy
  10.         from sqlalchemy import MetaData, Table, Column, String, Integer, Float, DateTime, UniqueConstraint, inspect
  11.         metadata = MetaData()
  12.         alt_users_churn = Table(
  13.             'alt_users_churn',
  14.             metadata,
  15.             Column('id', Integer, primary_key=True, autoincrement=True),
  16.             Column('customer_id', String),
  17.             Column('begin_date', DateTime),
  18.             Column('end_date', DateTime),
  19.             Column('type', String),
  20.             Column('paperless_billing', String),
  21.             Column('payment_method', String),
  22.             Column('monthly_charges', Float),
  23.             Column('total_charges', Float),
  24.             Column('internet_service', String),
  25.             Column('online_security', String),
  26.             Column('online_backup', String),
  27.             Column('device_protection', String),
  28.             Column('tech_support', String),
  29.             Column('streaming_tv', String),
  30.             Column('streaming_movies', String),
  31.             Column('gender', String),
  32.             Column('senior_citizen', Integer),
  33.             Column('partner', String),
  34.             Column('dependents', String),
  35.             Column('multiple_lines', String),
  36.             Column('target', Integer),
  37.             UniqueConstraint('customer_id', name='uq_customer_id')
  38.         )
  39.         hook = PostgresHook('destination_db')
  40.         engine = hook.get_sqlalchemy_engine()
  41.         conn = engine.connect()
  42.         try:
  43.             if not inspect(conn).has_table(alt_users_churn.name):
  44.                 metadata.create_all(conn)
  45.         finally:
  46.             conn.close()
  47.  
  48. def extract(**kwargs):
  49.     ti = kwargs['ti']
  50.  
  51.     hook = PostgresHook('source_db')
  52.     conn = hook.get_conn()
  53.     sql = f"""
  54.        SELECT
  55.            c.customer_id, c.begin_date, c.end_date, c.type, c.paperless_billing, c.payment_method, c.monthly_charges, c.total_charges,
  56.            i.internet_service, i.online_security, i.online_backup, i.device_protection, i.tech_support, i.streaming_tv, i.streaming_movies,
  57.            p.gender, p.senior_citizen, p.partner, p.dependents,
  58.            ph.multiple_lines
  59.        FROM contracts AS c
  60.        LEFT JOIN internet AS i ON i.customer_id = c.customer_id
  61.        LEFT JOIN personal AS p ON p.customer_id = c.customer_id
  62.        LEFT JOIN phone AS ph ON ph.customer_id = c.customer_id
  63.        """
  64.     data = pd.read_sql(sql, conn)
  65.     conn.close()
  66.     ti.xcom_push('extracted_data', data)
  67.  
  68. def transform(**kwargs):
  69.     ti = kwargs['ti'] # получение объекта task_instance
  70.     data = ti.xcom_pull(task_ids='extract', key='extracted_data') # выгрузка данных из task_instance
  71.     data['target'] = (data['end_date'] != 'No').astype(int) # логика функции
  72.     data['end_date'].replace({'No': None}, inplace=True)
  73.     ti.xcom_push('transformed_data', data) # вместо return отправляем данные передатчику task_instance
  74.  
  75. def load(**kwargs):
  76.     # ваш код здесь #
  77.     ti = kwargs['ti']
  78.     data = ti.xcom_pull(task_ids='transform', key='transformed_data')
  79.     hook = PostgresHook('destination_db')
  80.     hook.insert_rows(
  81.         table="alt_users_churn",
  82.         replace=True,
  83.         target_fields=data.columns.tolist(),
  84.         replace_index=['customer_id'],
  85.         rows=data.values.tolist()
  86.     )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement