Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def UGC_moderation(*args, **kwargs):
- from MPA_Tools.airflow_tools import extract_connections_from_json
- import vertica_python
- from datetime import datetime, timedelta, date
- import pandas as pd
- connections = extract_connections_from_json(kwargs['connections'])
- connection_vertica = connections["content_analytics_vertica"]
- vertica_conn_info = {'host': str(connection_vertica.host), 'port': str(connection_vertica.port),
- 'database': str(connection_vertica.schema),
- 'user': str(connection_vertica.login), 'password': str(connection_vertica.password),
- 'read_timeout': 600, 'unicode_error': 'strict', 'ssl': False}
- vertica_conn = vertica_python.connect(**vertica_conn_info)
- START_DATE = '2023-01-01'
- # START_DATE = '2023-08-16'
- END_DATE = date.today() - timedelta(1)
- # START_DATE = END_DATE - timedelta(4)
- cur = vertica_conn.cursor()
- with vertica_python.connect(**vertica_conn_info) as connection:
- cur = connection.cursor('dict')
- # 99079191820006748
- # +-------------+----------+--------+
- # |cpu_time_hour|network_gb|spill_gb|
- # +-------------+----------+--------+
- # |0.3614 |24.9210 |0.0000 |
- # +-------------+----------+--------+
- # +-----------+-------+
- # |p95_div_p50|max_cpu|
- # +-----------+-------+
- # |1.15 |24.34 |
- # +-----------+-------+
- cur.execute(f"""
- create local temporary table entity_uuid_sku on commit preserve rows as (
- (select distinct review_uuid as entity_uuid,
- item_id as sku
- from bx_rp_product.review as r
- join bx_rp_cm.status_history as sh
- on r.review_uuid = sh.external_entity_uuid
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
- union
- (select distinct review_uuid as entity_uuid,
- item_id as sku
- from bx_rp_product.review as r
- join bx_rp_cm.status_history as sh
- on r.review_uuid = sh.external_entity_root_parent_uuid
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
- ) order by sku segmented by hash(sku) all nodes;
- """)
- print(1)
- # 283726776541533075
- # +-------------+----------+--------+
- # |cpu_time_hour|network_gb|spill_gb|
- # +-------------+----------+--------+
- # |0.0545 |0.3553 |0.0000 |
- # +-------------+----------+--------+
- # +-----------+-------+
- # |p95_div_p50|max_cpu|
- # +-----------+-------+
- # |1.24 |4 |
- # +-----------+-------+
- cur.execute(f"""
- create local temporary table entity_id_sku on commit preserve rows as (
- (select distinct iq.SourceKey as entity_id,
- i.SourceKey as sku
- from dwh_data.Anc_ItemQuestion as iq
- join dwh_data.Tie_ItemQuestion_Item as iq_i
- on iq.ItemQuestionId = iq_i.ItemQuestionId
- join dwh_data.Anc_Item as i
- on iq_i.ItemId = i.ItemId
- join bx_rp_cm.status_history as sh
- on iq.SourceKey = sh.external_entity_id
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
- union
- (select distinct ia.SourceKey as entity_id,
- i.SourceKey as sku
- from dwh_data.Anc_ItemAnswer as ia
- join dwh_data.Tie_ItemAnswer_ItemQuestion as ia_iq
- on ia.ItemAnswerId = ia_iq.ItemAnswerId
- join dwh_data.Anc_ItemQuestion as iq
- on ia_iq.ItemQuestionId = iq.ItemQuestionId
- join dwh_data.Tie_ItemQuestion_Item as iq_i
- on iq.ItemQuestionId = iq_i.ItemQuestionId
- join dwh_data.Anc_Item as i
- on iq_i.ItemId = i.ItemId
- join bx_rp_cm.status_history as sh
- on ia.SourceKey = sh.external_entity_id
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
- ) order by sku segmented by hash(sku) all nodes;
- """)
- print(2)
- # 108086391075334458
- # +-------------+----------+--------+
- # |cpu_time_hour|network_gb|spill_gb|
- # +-------------+----------+--------+
- # |0.3419 |3.4550 |0.0000 |
- # +-------------+----------+--------+
- # +-----------+-------+
- # |p95_div_p50|max_cpu|
- # +-----------+-------+
- # |1.16 |24.02 |
- # +-----------+-------+
- cur.execute(f"""
- create local temporary table sku_crossborder on commit preserve rows as
- select distinct i.SourceKey as Sku,
- case
- when s.SourceKey = 0 then false -- 1P товары
- else si.IsCrossborder
- end as is_crossborder
- from dwh_data.Anc_Seller as s
- left join dwh_data.Atr_Seller_IsCrossborder as si
- on si.SellerId = s.SellerId
- join dwh_data.Tie_Item_Seller as i_s
- on s.SellerId = i_s.SellerId
- join dwh_data.anc_item as i
- on i_s.ItemId = i.ItemId
- where true
- and i.SourceKey in (select distinct sku
- from entity_id_sku
- union
- select distinct sku
- from entity_uuid_sku)
- order by Sku segmented by hash(Sku) all nodes;
- """)
- print(3)
- cur.execute(f"""
- create local temporary table tmp_versions on commit preserve rows as
- select external_entity_uuid,
- external_entity_version,
- max(created_at) as version_last_moderation,
- lag(external_entity_version) over (partition by external_entity_uuid
- order by external_entity_version) as prev_version
- from bx_rp_cm.status_history as sh
- where true
- and external_entity_type = 1
- and created_at between '{START_DATE}' and '{END_DATE}'
- group by external_entity_uuid,
- external_entity_version
- segmented by hash(external_entity_uuid) all nodes;
- """)
- print(4)
- cur.execute(f"""
- create local temporary table tmp_prevmoderations on commit preserve rows as
- select external_entity_uuid,
- external_entity_version,
- max(created_at) as last_moderation
- from bx_rp_cm.status_history as sh
- where true
- and external_entity_type = 1
- and created_at between '{START_DATE}' and '{END_DATE}'
- group by external_entity_uuid, external_entity_version
- segmented by hash(external_entity_uuid) all nodes;
- """)
- print(5)
- cur.execute(f"""
- create local temporary table review_versions on commit preserve rows as
- select v.external_entity_uuid as review_uuid,
- v.external_entity_version as version,
- v.version_last_moderation,
- prev_m.last_moderation as prev_version_last_moderation
- from tmp_versions v
- left join tmp_prevmoderations prev_m
- on v.external_entity_uuid = prev_m.external_entity_uuid
- and v.prev_version = prev_m.external_entity_version
- order by version desc
- segmented by hash(review_uuid) all nodes;
- """)
- print(6)
- cur.execute(f"""
- create local temporary table media_versions on commit preserve rows as
- select r.review_uuid,
- sh.external_entity_uuid as media_uuid,
- r.prev_version_last_moderation,
- r.version_last_moderation,
- sh.created_at,
- r.version as review_version,
- sh.external_entity_type as entity_type,
- sh.user_id
- from bx_rp_cm.status_history as sh
- left join review_versions as r
- on sh.external_entity_root_parent_uuid = r.review_uuid
- where true
- and sh.external_entity_type in (2, 6, 9, 10)
- and (prev_version_last_moderation < sh.created_at and sh.created_at <= version_last_moderation)
- or (prev_version_last_moderation IS NULL and sh.created_at <= version_last_moderation)
- and date(version_last_moderation) between '{START_DATE}' and '{END_DATE}'
- order by r.version desc
- segmented by hash(review_uuid) all nodes;
- """)
- print(7)
- cur.execute(f"""
- create local temporary table final_payout on commit preserve rows as
- select
- entity_uuid,
- entity_version,
- entity_type_id,
- null as entity_parent_root_uuid,
- null as entity_parent_root,
- user_id,
- status_id,
- date as moderated_at
- from
- bx_rp_cm.moderator_payout
- where true
- and entity_uuid is not null
- and entity_type_id = 1 -- для наглядности берем только отзывы
- and date(date) between '{START_DATE}' and '{END_DATE}'
- union all
- select
- mv.media_uuid,
- mv.review_version,
- mv.entity_type,
- mv.review_uuid as entity_parent_root_uuid,
- review_version as entity_parent_root_version,
- mp.user_id,
- mp.status_id,
- mp.date as moderated_at
- from bx_rp_cm.moderator_payout as mp
- join media_versions as mv
- on mp.entity_uuid = mv.review_uuid
- and mp.entity_version = mv.review_version
- order by entity_uuid, entity_version;
- """)
- print(8)
- cur.execute(f"""
- delete from CAN_team.UGC_moderation
- where date(moderated) between '{START_DATE}' and '{END_DATE}'
- """)
- print(9)
- # 108086391075334665
- # +-------------+----------+--------+
- # |cpu_time_hour|network_gb|spill_gb|
- # +-------------+----------+--------+
- # |0.1079 |44.7491 |0.8922 |
- # +-------------+----------+--------+
- # +-----------+-------+
- # |p95_div_p50|max_cpu|
- # +-----------+-------+
- # |1.3 |25.01 |
- # +-----------+-------+
- cur.execute(f"""
- create local temporary table table_1 on commit preserve rows as
- ( select timestampdiff('second', tb.created_at, next_created_at) as diff,
- case
- when user_id = 0 and (diff >= 10 * 60 or diff is null)
- then 'автомодерация'
- when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
- then 'ручная неоплачиваемая модерация'
- when diff < 10 * 60
- then 'double'
- end as moderated,
- case
- when allow_to_publish = false then 'declined'
- when allow_to_publish = true then 'approved'
- end as status,
- case
- when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
- when tb.entity_type in (7, 8, 9, 10) then 1
- end as is_travel,
- euk.sku as sku,
- tb.created_at as moderated_at,
- tb.status_id,
- tb.request_id,
- tb.user_id,
- case
- when tb.entity_type in (1, 7) then 'review'
- when tb.entity_type in (3, 8) then 'comment'
- end as entity_type,
- tb.entity_id,
- tb.entity_uuid,
- tb.entity_root_parent_id,
- tb.entity_root_parent_uuid,
- case
- when sc.is_crossborder = true then 1
- when sc.is_crossborder = false then 0
- end as is_crossborder
- from (select distinct
- sh.status_id,
- sh.user_id,
- sh.created_at,
- sh.external_entity_id as entity_id,
- sh.external_entity_type as entity_type,
- sh.external_entity_uuid as entity_uuid,
- sh.external_entity_root_parent_id as entity_root_parent_id,
- sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
- hash(sh.external_entity_uuid, sh.created_at, sh.user_id, sh.status_id) as request_id,
- lead(sh.created_at, 1) over (partition by sh.external_entity_uuid,
- sh.external_entity_version
- order by sh.created_at) as next_created_at
- from bx_rp_cm.status_history as sh
- where true
- and sh.external_entity_type in (1, 3, 7, 8) -- отзывы и комментарии
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}'
- ) as tb
- left join bx_rp_cm.user as us
- on tb.user_id = us.id
- left join bx_rp_product.review as rvs
- on tb.entity_uuid = rvs.review_uuid
- left join bx_rp_cm.statuses as ss
- on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
- left join entity_uuid_sku as euk
- on tb.entity_uuid = euk.entity_uuid
- left join sku_crossborder as sc
- on euk.sku = sc.Sku
- where true
- and us.role <> 'operator' or user_id = 0
- order by tb.created_at )
- """)
- print(10)
- cur.execute(f"""
- create local temporary table table_2 on commit preserve rows as
- (select timestampdiff('second', tb.created_at, next_created_at) as diff,
- case
- when user_id = 0 and (diff >= 10 * 60 or diff is null)
- then 'автомодерация'
- when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
- then 'ручная неоплачиваемая модерация'
- when diff < 10 * 60
- then 'double'
- end as moderated,
- case
- when allow_to_publish = false then 'declined'
- when allow_to_publish = true then 'approved'
- end as status,
- case
- when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
- when tb.entity_type in (7, 8, 9, 10) then 1
- end as is_travel,
- euk.sku as sku,
- tb.created_at as moderated_at,
- tb.status_id,
- tb.request_id,
- tb.user_id,
- case
- when tb.entity_type in (2, 9) then 'photo'
- when tb.entity_type in (6, 10) then 'video'
- end as entity_type,
- tb.entity_id,
- tb.entity_uuid,
- tb.entity_root_parent_id,
- tb.entity_root_parent_uuid,
- case
- when sc.is_crossborder = true then 1
- when sc.is_crossborder = false then 0
- end as is_crossborder
- from (select distinct sh.status_id,
- sh.user_id,
- sh.created_at,
- sh.external_entity_id as entity_id,
- sh.external_entity_type as entity_type,
- sh.external_entity_uuid as entity_uuid,
- sh.external_entity_root_parent_id as entity_root_parent_id,
- sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
- hash(sh.external_entity_id, sh.created_at, sh.user_id, sh.status_id) as request_id,
- lead(sh.created_at, 1) over (partition by sh.external_entity_uuid
- order by sh.created_at) as next_created_at
- from bx_rp_cm.status_history as sh
- where true
- and sh.external_entity_type in (2, 6, 9, 10)
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}') as tb
- left join bx_rp_cm.user as us
- on tb.user_id = us.id
- left join bx_rp_cm.statuses as ss
- on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
- left join entity_uuid_sku as euk
- on tb.entity_root_parent_uuid = euk.entity_uuid
- left join sku_crossborder as sc
- on euk.sku = sc.Sku
- where true
- and us.role <> 'operator' or user_id = 0
- order by tb.created_at)
- """)
- print(11)
- cur.execute(f"""
- create local temporary table table_3 on commit preserve rows as
- ( select timestampdiff('second', tb.created_at, next_created_at) as diff,
- case
- when user_id = 0 and (diff >= 10 * 60 or diff is null)
- then 'автомодерация'
- when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
- then 'ручная неоплачиваемая модерация'
- when diff < 10 * 60
- then 'double'
- end as moderated,
- case
- when allow_to_publish = false then 'declined'
- when allow_to_publish = true then 'approved'
- end as status,
- case
- when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
- when tb.entity_type in (7, 8, 9, 10) then 1
- end as is_travel,
- eik.sku as sku,
- tb.created_at as moderated_at,
- tb.status_id,
- tb.request_id,
- tb.user_id,
- case
- when tb.entity_type = 4 then 'answer'
- when tb.entity_type = 5 then 'question'
- end as entity_type,
- tb.entity_id,
- tb.entity_uuid,
- tb.entity_root_parent_id, -- для запроса без with не мешает по времени
- tb.entity_root_parent_uuid, -- для запроса без with не мешает по времени
- case
- when sc.is_crossborder = true then 1
- when sc.is_crossborder = false then 0
- end as is_crossborder
- from (select distinct
- sh.status_id,
- sh.user_id,
- sh.created_at,
- sh.external_entity_id as entity_id,
- sh.external_entity_type as entity_type,
- sh.external_entity_uuid as entity_uuid,
- sh.external_entity_root_parent_id as entity_root_parent_id,
- sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
- hash(sh.external_entity_id, sh.created_at, sh.user_id, sh.status_id) as request_id,
- lead(sh.created_at, 1) over (partition by sh.external_entity_id
- order by sh.created_at) as next_created_at
- from bx_rp_cm.status_history as sh
- where true
- and sh.external_entity_type in (4, 5)
- and date(sh.created_at) between '{START_DATE}' and '{END_DATE}') as tb
- left join bx_rp_cm.user as us
- on tb.user_id = us.id
- left join bx_rp_cm.statuses as ss
- on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
- left join entity_id_sku as eik
- on tb.entity_id = eik.entity_id
- left join sku_crossborder as sc
- on eik.sku = sc.sku
- where true
- and us.role <> 'operator' or user_id = 0
- order by tb.created_at );
- """)
- print(12)
- cur.execute(f"""
- create local temporary table table_4 on commit preserve rows as
- select
- null::int as diff,
- 'ручная оплачиваемая модерация' as moderated,
- case
- when ss.allow_to_publish = false then 'declined'
- when ss.allow_to_publish = true then 'approved'
- end as status,
- case
- when fp.entity_type_id in (1, 2, 3, 4, 5, 6) then 0
- when fp.entity_type_id in (7, 8, 9, 10) then 1
- end as is_travel,
- euk.sku as sku,
- moderated_at,
- fp.status_id,
- hash(fp.entity_uuid, fp.moderated_at, fp.user_id, fp.status_id) as request_id,
- fp.user_id,
- case
- when fp.entity_type_id = 1 then 'review'
- when fp.entity_type_id in (2, 9) then 'photo'
- when fp.entity_type_id in (6, 10) then 'video'
- end as entity_type,
- null::int as entity_id,
- fp.entity_uuid as entity_uuid,
- null::int as entity_root_parent_id,
- fp.entity_parent_root_uuid,
- case
- when sc.is_crossborder = true then 1
- when sc.is_crossborder = false then 0
- end as is_crossborder
- from final_payout as fp
- left join bx_rp_cm.user as us
- on fp.user_id = us.id
- left join bx_rp_cm.statuses as ss
- on fp.status_id = ss.id and fp.entity_type_id = ss.external_entity_type
- left join entity_uuid_sku as euk
- on fp.entity_parent_root_uuid = euk.entity_uuid
- left join sku_crossborder as sc
- on euk.sku = sc.sku
- where fp.entity_type_id in (1, 2, 6, 7, 9, 10)
- order by moderated_at
- """)
- print(13)
- cur.execute(f"""
- insert into CAN_team.UGC_moderation
- (
- diff,
- moderated,
- status,
- is_travel,
- sku,
- moderated_at,
- status_id,
- request_id,
- user_id,
- entity_type,
- entity_id,
- entity_uuid,
- entity_root_parent_id,
- entity_root_parent_uuid,
- is_crossborder
- )
- select * from table_1
- union
- select * from table_2
- union
- select * from table_3
- union
- select * from table_4
- """)
- print(14)
- # create table CAN_team.UGC_moderation
- # (
- # diff int,
- # moderated varchar(80),
- # status varchar(12),
- # IsTravel boolean,
- # sku int,
- # moderated_at timestamp,
- # -- rvs.published_at,
- # status_id int,
- # request_id int,
- # user_id int,
- # entity_type varchar(12),
- # entity_id int,
- # entity_uuid uuid,
- # entity_root_parent_id int,
- # entity_root_parent_uuid uuid,
- # -- rvs.client_id,
- # IsCrossborder boolean
- # ) order by request_id segmented by request_id all nodes;
- connection.commit()
- cur.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement