Advertisement
AliaksandrLet

Спринт 3 - Тема 5 - Урок 5 - Задание 2

Jun 25th, 2023 (edited)
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.34 KB | None | 0 0
  1. from airflow import DAG
  2. from airflow.providers.http.operators.http import SimpleHttpOperator
  3. from airflow.hooks.base import BaseHook
  4. from airflow.operators.python import PythonOperator
  5.  
  6. import datetime
  7. import requests
  8. import pandas as pd
  9. import numpy as np
  10. import os
  11. import psycopg2, psycopg2.extras
  12.  
  13. dag = DAG(
  14.     dag_id='552_postgresql_export_fuction',
  15.     schedule_interval='0 0 * * *',
  16.     start_date=datetime.datetime(2021, 1, 1),
  17.     catchup=False,
  18.     dagrun_timeout=datetime.timedelta(minutes=60),
  19.     tags=['example', 'example2'],
  20.     params={"example_key": "example_value"},
  21. )
  22. business_dt = {'dt':'2022-05-06'}
  23.  
  24. def load_file_to_pg(filename, pg_table, conn_args):
  25.     df = pd.read_csv(f"/lessons/5. Реализация ETL в Airflow/4. Extract как подключиться к хранилищу, чтобы получить файл/Задание 2/{filename}", index_col=0)
  26.     cols = ','.join(df.columns)
  27.  
  28.     conn = psycopg2.connect(conn_args)
  29.     cur = conn.cursor()
  30.  
  31.     step = int(df.shape[0] / 100)
  32.     i = 0
  33.     while i <= df.shape[0]:
  34.         cr_val = str([tuple(x) for x in df.iloc[i:i + step].to_numpy()])[1:-1]
  35.         insert_cr = f"INSERT into stage.{pg_table} ({cols}) VALUES {cr_val};"
  36.         cur.execute(insert_cr)
  37.         conn.commit()
  38.         i += step + 1
  39.  
  40.     cur.close()
  41.     conn.close()
  42.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement