Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from airflow import DAG
- from airflow.providers.http.operators.http import SimpleHttpOperator
- from airflow.hooks.base import BaseHook
- from airflow.operators.python import PythonOperator
- import datetime
- import requests
- import pandas as pd
- import numpy as np
- import os
- import psycopg2, psycopg2.extras
- dag = DAG(
- dag_id='552_postgresql_export_fuction',
- schedule_interval='0 0 * * *',
- start_date=datetime.datetime(2021, 1, 1),
- catchup=False,
- dagrun_timeout=datetime.timedelta(minutes=60),
- tags=['example', 'example2'],
- params={"example_key": "example_value"},
- )
- business_dt = {'dt':'2022-05-06'}
- def load_file_to_pg(filename, pg_table, conn_args):
- df = pd.read_csv(f"/lessons/5. Реализация ETL в Airflow/4. Extract как подключиться к хранилищу, чтобы получить файл/Задание 2/{filename}", index_col=0)
- cols = ','.join(df.columns)
- conn = psycopg2.connect(conn_args)
- cur = conn.cursor()
- step = int(df.shape[0] / 100)
- i = 0
- while i <= df.shape[0]:
- cr_val = str([tuple(x) for x in df.iloc[i:i + step].to_numpy()])[1:-1]
- insert_cr = f"INSERT into stage.{pg_table} ({cols}) VALUES {cr_val};"
- cur.execute(insert_cr)
- conn.commit()
- i += step + 1
- cur.close()
- conn.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement