Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime, date, timedelta
- import logging
- import numpy as np
- from typing import Dict, List, Tuple
- from champ_api_working import ChampApiV4
- from common import AnalyticsDb
- from ym_api_working import YmApi
- logger = logging.getLogger('pubs_main')
- logger.setLevel(logging.INFO)
- class PubsWorking:
- def __init__(self, champ: ChampApiV4, ym: YmApi, db: AnalyticsDb):
- self.champ = champ
- self.ym = ym
- self.db = db
- # ... (остальные методы остаются без изменений, рефакторим только работу с exclusive_pubs) ...
- def work_with_exclusive_pubs(self, pub_d1: datetime, pub_d2: datetime, sc_d1: date, sc_d2: date) -> None:
- """
- Downloads and processes exclusive publications data for a given period.
- Args:
- pub_d1 (datetime): Start date for publication period.
- pub_d2 (datetime): End date for publication period.
- sc_d1 (date): Start date for scan period.
- sc_d2 (date): End date for scan period.
- """
- logger.info(
- f"Starting exclusive pubs processing. Publication period: {pub_d1} - {pub_d2}, "
- f"Scan period: {sc_d1} - {sc_d2}"
- )
- # 1. Получаем данные об эксклюзивных публикациях
- exclusive_pubs_ids = self.get_exclusive_pubs(pub_d1, pub_d2)
- if not exclusive_pubs_ids['news'] and not exclusive_pubs_ids['article']:
- logger.warning("No exclusive publications found for the given period.")
- return
- # 2. Удаляем старые данные
- self._delete_old_exclusive_data(sc_d1, sc_d2)
- # 3. Собираем новые данные по трафику и статистике
- exclusive_data = self._collect_exclusive_stats(
- sc_d1, sc_d2, exclusive_pubs_ids['news'], exclusive_pubs_ids['article']
- )
- # 4. Сохраняем новые данные
- self._save_exclusive_data(exclusive_data, sc_d1, sc_d2)
- logger.info("Finished processing exclusive publications.")
- def _delete_old_exclusive_data(self, scan_start: date, scan_end: date) -> None:
- """Deletes old exclusive publications data for the specified scan period."""
- logger.info(f"Deleting old exclusive publications data from {scan_start} to {scan_end}")
- self.db.del_exclusive_pubs_data(scan_start, scan_end)
- def _collect_exclusive_stats(
- self, scan_start: date, scan_end: date, news_ids: Dict[int, int], article_ids: Dict[int, int]
- ) -> List[dict]:
- """
- Collects traffic statistics for exclusive publications.
- Args:
- scan_start (date): Start date for scan period.
- scan_end (date): End date for scan period.
- news_ids (Dict[int, int]): Mapping of news pub IDs to internal IDs.
- article_ids (Dict[int, int]): Mapping of article pub IDs to internal IDs.
- Returns:
- List[dict]: List of dictionaries with stats for exclusive publications.
- """
- exclusive_rows = []
- news_keys = list(news_ids.keys())
- article_keys = list(article_ids.keys())
- for traffic_id, traffic_name, traffic_filter in self.db.traffics:
- logger.info(f"Fetching Yandex Metrika stats for traffic: {traffic_name}")
- stats = self.get_pub_stats(scan_start, scan_end, news_keys, article_keys, traffic_name, traffic_filter)
- for day, date_stats in stats.items():
- logger.info(
- f"{day}: Retrieved {len(date_stats['news'])} news and {len(date_stats['article'])} articles"
- )
- exclusive_rows.extend(
- self._process_day_stats(day, date_stats, news_ids, article_ids, traffic_id)
- )
- return exclusive_rows
- def _process_day_stats(
- self, day: date, date_stats: Dict[str, dict], news_ids: Dict[int, int],
- article_ids: Dict[int, int], traffic_id: int
- ) -> List[dict]:
- """
- Processes daily stats for news and articles into a list of rows.
- Args:
- day (date): Date of the stats.
- date_stats (Dict[str, dict]): Stats for news and articles.
- news_ids (Dict[int, int]): Mapping of news pub IDs to internal IDs.
- article_ids (Dict[int, int]): Mapping of article pub IDs to internal IDs.
- traffic_id (int): Traffic ID for the stats.
- Returns:
- List[dict]: Processed rows for the day.
- """
- rows = []
- for pub_type, pub_stats in date_stats.items():
- id_map = news_ids if pub_type == 'news' else article_ids
- for pub_id, stats in pub_stats.items():
- if pub_id not in id_map:
- logger.debug(f"Skipping unknown pub ID {pub_id} from Yandex Metrika stats.")
- continue
- internal_id = id_map[pub_id]
- rows.append({
- 'internal_id': internal_id,
- 'date': day,
- 'traffic_id': traffic_id,
- 'pageviews': stats['pv'],
- 'article_number': str(pub_id)
- })
- return rows
- def _save_exclusive_data(self, data: List[dict], scan_start: date, scan_end: date) -> None:
- """Saves exclusive publications data to the database."""
- logger.info(f"Saving {len(data)} exclusive publication records for period {scan_start} to {scan_end}")
- if data:
- self.db.new_exclusive_pubs_data_batch(data)
- else:
- logger.warning("No exclusive publication data to save.")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement