Advertisement
trsp

Ошибка в sql-запросе шага extract

Feb 13th, 2025
145
0
6 days
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.06 KB | Source Code | 0 0
  1. # Код:
  2.  
  3. import pendulum
  4. from airflow.decorators import dag, task
  5.  
  6. @dag(
  7.     schedule='@once',
  8.     start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
  9.     tags=["ETL"]
  10. )
  11. def clean_churn_dataset():
  12.     import pandas as pd
  13.     import numpy as np
  14.     from airflow.providers.postgres.hooks.postgres import PostgresHook
  15.     @task()
  16.     def create_table():
  17.         from sqlalchemy import Table, Column, DateTime, Float, Integer, Index, MetaData, String, UniqueConstraint, inspect
  18.         hook = PostgresHook("destination_db")
  19.         db_engine = hook.get_sqlalchemy_engine()
  20.  
  21.         metadata = MetaData()
  22.  
  23.         churn_table = Table("clean_users_churn", metadata,
  24.             Column('id', Integer, primary_key=True, autoincrement=True),
  25.             Column('customer_id', String),
  26.             Column('begin_date', DateTime),
  27.             Column('end_date', DateTime),
  28.             Column('type', String),
  29.             Column('paperless_billing', String),
  30.             Column('payment_method', String),
  31.             Column('monthly_charges', Float),
  32.             Column('total_charges', Float),
  33.             Column('internet_service', String),
  34.             Column('online_security', String),
  35.             Column('online_backup', String),
  36.             Column('device_protection', String),
  37.             Column('tech_support', String),
  38.             Column('streaming_tv', String),
  39.             Column('streaming_movies', String),
  40.             Column('gender', String),
  41.             Column('senior_citizen', Integer),
  42.             Column('partner', String),
  43.             Column('dependents', String),
  44.             Column('multiple_lines', String),
  45.             Column('target', Integer),
  46.             UniqueConstraint('customer_id', name='unique_clean_customer_constraint')
  47.         )
  48.         if not inspect(db_engine).has_table(churn_table.name):
  49.             metadata.create_all(db_engine)
  50.     @task()
  51.     def extract():
  52.         hook = PostgresHook('destination_db')
  53.         conn = hook.get_conn()
  54.         sql = f"""
  55.        select
  56.            c.customer_id, c.begin_date, c.end_date, c.type, c.paperless_billing, c.payment_method, c.monthly_charges, c.total_charges,
  57.            i.internet_service, i.online_security, i.online_backup, i.device_protection, i.tech_support, i.streaming_tv, i.streaming_movies,
  58.            p.gender, p.senior_citizen, p.partner, p.dependents,
  59.            ph.multiple_lines
  60.        from users_churn as c
  61.        left join internet as i on i.customer_id = c.customer_id
  62.        left join personal as p on p.customer_id = c.customer_id
  63.        left join phone as ph on ph.customer_id = c.customer_id
  64.        """
  65.         data = pd.read_sql(sql, conn).drop(columns=["id"])
  66.         conn.close()
  67.         return data
  68.  
  69. -----
  70.  
  71. # Логи:
  72.  
  73. Traceback (most recent call last):
  74.   File "/home/airflow/.local/lib/python3.10/site-packages/pandas/io/sql.py", line 2262, in execute
  75.     cur.execute(sql, *args)
  76. psycopg2.errors.UndefinedTable: relation "internet" does not exist
  77. LINE 8:         left join internet as i on i.customer_id = c.custome...
  78.                           ^
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement