Advertisement
gubichas

Untitled

Mar 10th, 2025
284
0
20 hours
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.04 KB | None | 0 0
  1. private_main.py:
  2. import argparse
  3. import logging
  4. from datetime import date
  5. from concurrent.futures import ThreadPoolExecutor
  6.  
  7. # Если хотите оставить yandex_metrika — можно импортировать
  8. from metrika import get_metrika
  9. from creds import DB_ANALYTICS
  10.  
  11. from private_lenta_worker import PrivateLentaWorker
  12.  
  13. logger = logging.getLogger('private_main')
  14. logger.setLevel(logging.DEBUG)
  15.  
  16. parser = argparse.ArgumentParser(description="Скрипт для работы с приватным Lenta API")
  17.  
  18. parser.add_argument('-s', '--start_date', dest='start_date', type=date.fromisoformat)
  19. parser.add_argument('-e', '--end_date', dest='end_date', type=date.fromisoformat)
  20. parser.add_argument('-t', '--traffic_source_id', dest='traffic_source_id', type=int, default=None)
  21. # Можно оставить выбор api, если нужно
  22. parser.add_argument('-a', '--api', dest='api', type=str, choices=['yandex_metrika', 'private_lenta'], default='private_lenta')
  23.  
  24. if __name__ == '__main__':
  25.     args = parser.parse_args()
  26.     worker = PrivateLentaWorker()
  27.  
  28.     if args.api == 'yandex_metrika':
  29.         # Если вам нужно продолжать скачивать метрику в этом новом коде
  30.         logger.info(f"Запуск Yandex Metrika с {args.start_date} по {args.end_date}")
  31.         # Тут можно в теории обращаться к старой логике ORM, если нужно
  32.         # Или добавить методы в PrivateLentaDb
  33.         # ...
  34.         logger.info("Фрагмент кода для Яндекс Метрики, если требуется. Или пропустить.")
  35.     else:
  36.         # По умолчанию работаем с private_lenta
  37.         logger.info(f"Запуск приватного Lenta API с {args.start_date} по {args.end_date}")
  38.         # Здесь у вас должна быть логика, как вы выбираете pub_ids
  39.         # Например, из БД, если есть функция f_get_new_pub_id() и т.д.
  40.         # Для простоты — условно:
  41.         pub_ids = ['politics/2025/03/01/article_123/', 'economics/2025/03/02/article_456/']
  42.         logger.info(f"Найдено {len(pub_ids)} pub_id для скачивания")
  43.  
  44.         # Многопоточная обработка
  45.         with ThreadPoolExecutor(max_workers=15) as pool:
  46.             pool.map(worker.save_private_lenta_api, pub_ids)
  47.  
  48.     logger.info("Завершено.")
  49.  
  50. private_orm.py:
  51. import logging
  52. from sqlalchemy import MetaData, Table, create_engine
  53. from sqlalchemy.dialects.postgresql import insert
  54. from sqlalchemy.sql import select
  55.  
  56. logger = logging.getLogger('PrivateOrm')
  57. logger.setLevel(logging.DEBUG)
  58.  
  59. class PrivateLentaDb:
  60.     SCHEMA = 'lenta_content'
  61.  
  62.     def __init__(self, credentials: dict):
  63.         """
  64.        Конструктор, инициализируем подключение и таблицы.
  65.        """
  66.         self.conn = self._get_conn(credentials)
  67.         metadata = MetaData()
  68.         default_args = dict(autoload=True, autoload_with=self.conn.engine)
  69.  
  70.         # Основная таблица, куда будем писать сырые данные из приватного API
  71.         self.test_lenta_new_api = Table('test_lenta_new_api', metadata, schema=self.SCHEMA, **default_args)
  72.        
  73.         # Если нужно отслеживать битые pub_id
  74.         self.broken_pub_id = Table('broken_pud_id', metadata, schema=self.SCHEMA, **default_args)
  75.  
  76.     @staticmethod
  77.     def _get_conn(credentials: dict):
  78.         """
  79.        Создаём SQLAlchemy engine + connect
  80.        """
  81.         engine = create_engine(
  82.             'postgresql://{dbuser}:{dbpass}@{dbhost}:{dbport}/{dbname}'.format(**credentials),
  83.             isolation_level="READ UNCOMMITTED"
  84.         )
  85.         return engine.connect()
  86.  
  87.     def _execute_query(self, query):
  88.         return self.conn.execute(query)
  89.  
  90.     def add_data_lenta_api_new(self, values: list):
  91.     """
  92.    Запись в test_lenta_new_api.
  93.    """
  94.        
  95.  
  96.     def add_broken_pub_id(self, value: dict):
  97.         """
  98.        Запись в таблицу broken_pud_id (или broken_pub_id),
  99.        чтобы отслеживать неудачные запросы.
  100.        """
  101.         if not value:
  102.             return
  103.         ins = insert(self.broken_pub_id).values(value).on_conflict_do_nothing()
  104.         self._execute_query(ins)
  105.  
  106. private_lenta_worker.py
  107. import logging
  108. from time import sleep
  109. from datetime import datetime
  110. from private_lenta_api import PrivateLentaApi
  111. from private_orm import PrivateLentaDb  # см. ниже
  112. from creds import DB_ANALYTICS
  113.  
  114. logger = logging.getLogger('PrivateLentaWorker')
  115. logger.setLevel(logging.DEBUG)
  116.  
  117. class PrivateLentaWorker:
  118.     """
  119.    Класс, отвечающий за логику получения и сохранения
  120.    данных из нового (private) Lenta API.
  121.    """
  122.  
  123.     def __init__(self):
  124.         self.api = PrivateLentaApi()
  125.         self.db = PrivateLentaDb(DB_ANALYTICS)  # свой orm, см. private_orm.py
  126.  
  127.     def _fetch_new_api(self, pub_id: str):
  128.         """
  129.        Вспомогательный метод: запрос + парсинг + обработка ошибок
  130.        """
  131.         response = self.api.make_request(pub_id)
  132.         if not response:
  133.             logger.warning(f"Пустой ответ приватного API для {pub_id}")
  134.             self.db.add_broken_pub_id({"pub_id": pub_id})
  135.             return None
  136.  
  137.         parsed = self.api.parse_response(response)
  138.         if not parsed:
  139.             logger.warning(f"Не удалось распарсить приватный API для {pub_id}")
  140.             self.db.add_broken_pub_id({"pub_id": pub_id})
  141.             return None
  142.  
  143.         # parsed['pub_date'] — datetime-объект; в нашей БД (test_lenta_new_api) формат timestamptz или timestamp
  144.         # Для удобства разобьём на date/time, если это нужно (или отправим datetime целиком):
  145.         pub_date_dt = parsed['pub_date']
  146.         if pub_date_dt:
  147.             parsed['pub_date'] = pub_date_dt.date()
  148.             parsed['pub_time'] = pub_date_dt.time()
  149.         else:
  150.             parsed['pub_date'] = None
  151.             parsed['pub_time'] = None
  152.  
  153.         return parsed
  154.  
  155.     def save_private_lenta_api(self, pub_id):
  156.         """
  157.        Метод, вызываемый извне: получаем и сохраняем данные
  158.        по одному или нескольким pub_id в test_lenta_new_api.
  159.        """
  160.         if isinstance(pub_id, list):
  161.             results = []
  162.             for pid in pub_id:
  163.                 data = self._fetch_new_api(pid.lower())
  164.                 if data:
  165.                     results.append(data)
  166.         else:
  167.             data = self._fetch_new_api(pub_id.lower())
  168.             results = [data] if data else []
  169.  
  170.         # Если что-то собрали — пишем в test_lenta_new_api
  171.         if results:
  172.             self.db.add_data_lenta_api_new(results)
  173.  
  174.  
  175. private_lenta_api.py
  176. import requests
  177. import logging
  178. from datetime import datetime
  179. from requests.exceptions import HTTPError
  180.  
  181. logger = logging.getLogger('private_new_lenta_api')
  182. logger.setLevel(logging.DEBUG)
  183.  
  184. class PrivateLentaApi:
  185.     """
  186.    Класс для работы с приватным Lenta API.
  187.    """
  188.  
  189.     def __init__(self):
  190.         self.base_url = "http://analytics01.stage.lenta.rambler.tech:8080/topics/by_rel_urls"
  191.  
  192.     def make_request(self, article_url: str):
  193.         """
  194.        Отправляем запрос к приватному API, подставляя rel_urls[]=...
  195.        """
  196.         try:
  197.             response = requests.get(f"{self.base_url}?rel_urls[]={article_url}")
  198.             response.raise_for_status()
  199.             return response.json()
  200.         except HTTPError as e:
  201.             logger.error(f"Ошибка HTTP в приватном API: {e}")
  202.             return None
  203.         except Exception as e:
  204.             logger.error(f"Неизвестная ошибка в приватном API: {e}")
  205.             return None
  206.  
  207.     def parse_response(self, response):
  208.         """
  209.        Парсим структуру ответа (response — это список объектов).
  210.        Поле `title` пока не приходит от API, поэтому вставим заглушку `''`.
  211.        """
  212.         if not response or not isinstance(response, list) or len(response) == 0:
  213.             return None
  214.         try:
  215.             data = response[0]
  216.             pub_date = datetime.strptime(data['pub_date'], '%Y-%m-%dT%H:%M:%S.%fZ')
  217.  
  218.             return {
  219.                 'pub_date': pub_date,  # datetime
  220.                 'title': '',  # <-- заглушка
  221.                 'editor': data.get('editor'),
  222.                 'editor_groups': data.get('editor_groups', []),
  223.                 'author': data.get('author'),
  224.                 'pub_type': data.get('pub_type'),
  225.                 'pub_section': data.get('pub_section'),
  226.                 'pub_subsection': data['pub_subsection'][0] if data.get('pub_subsection') else None,
  227.                 'title_yandex': data.get('title_yandex'),
  228.                 'pub_id': data.get('pub_id'),
  229.                 'is_super_news': data.get('is_super_news'),
  230.                 'tags_rco': data.get('tags_rco', []),
  231.                 'tags_editors_true': data.get('tags_editors', {}).get('tags_editors_true', []),
  232.                 'tags_editors_false': data.get('tags_editors', {}).get('tags_editors_false', []),
  233.                 'bs_editors': data.get('bs_editors'),
  234.                 'bs_rco': data.get('bs_rco')
  235.             }
  236.         except (KeyError, IndexError, ValueError) as e:
  237.             logger.error(f"Ошибка парсинга приватного API: {e}")
  238.             return None
  239.  
  240. import requests
  241. import logging
  242. from datetime import datetime
  243. from requests.exceptions import HTTPError
  244.  
  245. logger = logging.getLogger('private_new_lenta_api')
  246. logger.setLevel(logging.DEBUG)
  247.  
  248. class PrivateLentaApi:
  249.     """
  250.    Класс для работы с приватным Lenta API.
  251.    """
  252.  
  253.     def __init__(self):
  254.         # URL можно менять, если поменяется хост/порт
  255.         self.base_url = "http://analytics01.stage.lenta.rambler.tech:8080/topics/by_rel_urls"
  256.  
  257.     def make_request(self, article_url: str):
  258.         """
  259.        Отправляем запрос к приватному API, подставляя rel_urls[]=...
  260.        """
  261.         try:
  262.             response = requests.get(f"{self.base_url}?rel_urls[]={article_url}")
  263.             response.raise_for_status()
  264.             return response.json()
  265.         except HTTPError as e:
  266.             logger.error(f"Ошибка HTTP в приватном API: {e}")
  267.             return None
  268.         except Exception as e:
  269.             logger.error(f"Неизвестная ошибка в приватном API: {e}")
  270.             return None
  271.  
  272.     def parse_response(self, response):
  273.         """
  274.        Парсим структуру ответа (response — это список объектов).
  275.        Поля могут отличаться в реальном API, убедитесь, что ключи совпадают.
  276.        """
  277.         if not response or not isinstance(response, list) or len(response) == 0:
  278.             return None
  279.         try:
  280.             data = response[0]
  281.             # pub_date приходит в формате ISO8601, пример: '2025-03-01T12:34:56.789Z'
  282.             pub_date = datetime.strptime(data['pub_date'], '%Y-%m-%dT%H:%M:%S.%fZ')
  283.             return {
  284.                 'pub_date': pub_date,  # datetime-объект
  285.                 'editor': data.get('editor'),
  286.                 'editor_groups': data.get('editor_groups', []),
  287.                 'author': data.get('author'),
  288.                 'pub_type': data.get('pub_type'),
  289.                 'pub_section': data.get('pub_section'),
  290.                 'pub_subsection': data['pub_subsection'][0] if data.get('pub_subsection') else None,
  291.                 # Поля title пока нет — добавите при появлении
  292.                 'title_yandex': data.get('title_yandex'),
  293.                 'pub_id': data.get('pub_id'),
  294.                 'is_super_news': data.get('is_super_news'),
  295.                 'tags_rco': data.get('tags_rco', []),
  296.                 # tags_editors — словарь, содержащий tags_editors_true / tags_editors_false
  297.                 'tags_editors_true': data.get('tags_editors', {}).get('tags_editors_true', []),
  298.                 'tags_editors_false': data.get('tags_editors', {}).get('tags_editors_false', []),
  299.                 'bs_editors': data.get('bs_editors'),
  300.                 'bs_rco': data.get('bs_rco')
  301.             }
  302.         except (KeyError, IndexError, ValueError) as e:
  303.             logger.error(f"Ошибка парсинга приватного API: {e}")
  304.             return None
  305.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement