Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private_main.py:
- import argparse
- import logging
- from datetime import date
- from concurrent.futures import ThreadPoolExecutor
- # Если хотите оставить yandex_metrika — можно импортировать
- from metrika import get_metrika
- from creds import DB_ANALYTICS
- from private_lenta_worker import PrivateLentaWorker
- logger = logging.getLogger('private_main')
- logger.setLevel(logging.DEBUG)
- parser = argparse.ArgumentParser(description="Скрипт для работы с приватным Lenta API")
- parser.add_argument('-s', '--start_date', dest='start_date', type=date.fromisoformat)
- parser.add_argument('-e', '--end_date', dest='end_date', type=date.fromisoformat)
- parser.add_argument('-t', '--traffic_source_id', dest='traffic_source_id', type=int, default=None)
- # Можно оставить выбор api, если нужно
- parser.add_argument('-a', '--api', dest='api', type=str, choices=['yandex_metrika', 'private_lenta'], default='private_lenta')
- if __name__ == '__main__':
- args = parser.parse_args()
- worker = PrivateLentaWorker()
- if args.api == 'yandex_metrika':
- # Если вам нужно продолжать скачивать метрику в этом новом коде
- logger.info(f"Запуск Yandex Metrika с {args.start_date} по {args.end_date}")
- # Тут можно в теории обращаться к старой логике ORM, если нужно
- # Или добавить методы в PrivateLentaDb
- # ...
- logger.info("Фрагмент кода для Яндекс Метрики, если требуется. Или пропустить.")
- else:
- # По умолчанию работаем с private_lenta
- logger.info(f"Запуск приватного Lenta API с {args.start_date} по {args.end_date}")
- # Здесь у вас должна быть логика, как вы выбираете pub_ids
- # Например, из БД, если есть функция f_get_new_pub_id() и т.д.
- # Для простоты — условно:
- pub_ids = ['politics/2025/03/01/article_123/', 'economics/2025/03/02/article_456/']
- logger.info(f"Найдено {len(pub_ids)} pub_id для скачивания")
- # Многопоточная обработка
- with ThreadPoolExecutor(max_workers=15) as pool:
- pool.map(worker.save_private_lenta_api, pub_ids)
- logger.info("Завершено.")
- private_orm.py:
- import logging
- from sqlalchemy import MetaData, Table, create_engine
- from sqlalchemy.dialects.postgresql import insert
- from sqlalchemy.sql import select
- logger = logging.getLogger('PrivateOrm')
- logger.setLevel(logging.DEBUG)
- class PrivateLentaDb:
- SCHEMA = 'lenta_content'
- def __init__(self, credentials: dict):
- """
- Конструктор, инициализируем подключение и таблицы.
- """
- self.conn = self._get_conn(credentials)
- metadata = MetaData()
- default_args = dict(autoload=True, autoload_with=self.conn.engine)
- # Основная таблица, куда будем писать сырые данные из приватного API
- self.test_lenta_new_api = Table('test_lenta_new_api', metadata, schema=self.SCHEMA, **default_args)
- # Если нужно отслеживать битые pub_id
- self.broken_pub_id = Table('broken_pud_id', metadata, schema=self.SCHEMA, **default_args)
- @staticmethod
- def _get_conn(credentials: dict):
- """
- Создаём SQLAlchemy engine + connect
- """
- engine = create_engine(
- 'postgresql://{dbuser}:{dbpass}@{dbhost}:{dbport}/{dbname}'.format(**credentials),
- isolation_level="READ UNCOMMITTED"
- )
- return engine.connect()
- def _execute_query(self, query):
- return self.conn.execute(query)
- def add_data_lenta_api_new(self, values: list):
- """
- Запись в test_lenta_new_api.
- """
- def add_broken_pub_id(self, value: dict):
- """
- Запись в таблицу broken_pud_id (или broken_pub_id),
- чтобы отслеживать неудачные запросы.
- """
- if not value:
- return
- ins = insert(self.broken_pub_id).values(value).on_conflict_do_nothing()
- self._execute_query(ins)
- private_lenta_worker.py
- import logging
- from time import sleep
- from datetime import datetime
- from private_lenta_api import PrivateLentaApi
- from private_orm import PrivateLentaDb # см. ниже
- from creds import DB_ANALYTICS
- logger = logging.getLogger('PrivateLentaWorker')
- logger.setLevel(logging.DEBUG)
- class PrivateLentaWorker:
- """
- Класс, отвечающий за логику получения и сохранения
- данных из нового (private) Lenta API.
- """
- def __init__(self):
- self.api = PrivateLentaApi()
- self.db = PrivateLentaDb(DB_ANALYTICS) # свой orm, см. private_orm.py
- def _fetch_new_api(self, pub_id: str):
- """
- Вспомогательный метод: запрос + парсинг + обработка ошибок
- """
- response = self.api.make_request(pub_id)
- if not response:
- logger.warning(f"Пустой ответ приватного API для {pub_id}")
- self.db.add_broken_pub_id({"pub_id": pub_id})
- return None
- parsed = self.api.parse_response(response)
- if not parsed:
- logger.warning(f"Не удалось распарсить приватный API для {pub_id}")
- self.db.add_broken_pub_id({"pub_id": pub_id})
- return None
- # parsed['pub_date'] — datetime-объект; в нашей БД (test_lenta_new_api) формат timestamptz или timestamp
- # Для удобства разобьём на date/time, если это нужно (или отправим datetime целиком):
- pub_date_dt = parsed['pub_date']
- if pub_date_dt:
- parsed['pub_date'] = pub_date_dt.date()
- parsed['pub_time'] = pub_date_dt.time()
- else:
- parsed['pub_date'] = None
- parsed['pub_time'] = None
- return parsed
- def save_private_lenta_api(self, pub_id):
- """
- Метод, вызываемый извне: получаем и сохраняем данные
- по одному или нескольким pub_id в test_lenta_new_api.
- """
- if isinstance(pub_id, list):
- results = []
- for pid in pub_id:
- data = self._fetch_new_api(pid.lower())
- if data:
- results.append(data)
- else:
- data = self._fetch_new_api(pub_id.lower())
- results = [data] if data else []
- # Если что-то собрали — пишем в test_lenta_new_api
- if results:
- self.db.add_data_lenta_api_new(results)
- private_lenta_api.py
- import requests
- import logging
- from datetime import datetime
- from requests.exceptions import HTTPError
- logger = logging.getLogger('private_new_lenta_api')
- logger.setLevel(logging.DEBUG)
- class PrivateLentaApi:
- """
- Класс для работы с приватным Lenta API.
- """
- def __init__(self):
- self.base_url = "http://analytics01.stage.lenta.rambler.tech:8080/topics/by_rel_urls"
- def make_request(self, article_url: str):
- """
- Отправляем запрос к приватному API, подставляя rel_urls[]=...
- """
- try:
- response = requests.get(f"{self.base_url}?rel_urls[]={article_url}")
- response.raise_for_status()
- return response.json()
- except HTTPError as e:
- logger.error(f"Ошибка HTTP в приватном API: {e}")
- return None
- except Exception as e:
- logger.error(f"Неизвестная ошибка в приватном API: {e}")
- return None
- def parse_response(self, response):
- """
- Парсим структуру ответа (response — это список объектов).
- Поле `title` пока не приходит от API, поэтому вставим заглушку `''`.
- """
- if not response or not isinstance(response, list) or len(response) == 0:
- return None
- try:
- data = response[0]
- pub_date = datetime.strptime(data['pub_date'], '%Y-%m-%dT%H:%M:%S.%fZ')
- return {
- 'pub_date': pub_date, # datetime
- 'title': '', # <-- заглушка
- 'editor': data.get('editor'),
- 'editor_groups': data.get('editor_groups', []),
- 'author': data.get('author'),
- 'pub_type': data.get('pub_type'),
- 'pub_section': data.get('pub_section'),
- 'pub_subsection': data['pub_subsection'][0] if data.get('pub_subsection') else None,
- 'title_yandex': data.get('title_yandex'),
- 'pub_id': data.get('pub_id'),
- 'is_super_news': data.get('is_super_news'),
- 'tags_rco': data.get('tags_rco', []),
- 'tags_editors_true': data.get('tags_editors', {}).get('tags_editors_true', []),
- 'tags_editors_false': data.get('tags_editors', {}).get('tags_editors_false', []),
- 'bs_editors': data.get('bs_editors'),
- 'bs_rco': data.get('bs_rco')
- }
- except (KeyError, IndexError, ValueError) as e:
- logger.error(f"Ошибка парсинга приватного API: {e}")
- return None
- import requests
- import logging
- from datetime import datetime
- from requests.exceptions import HTTPError
- logger = logging.getLogger('private_new_lenta_api')
- logger.setLevel(logging.DEBUG)
- class PrivateLentaApi:
- """
- Класс для работы с приватным Lenta API.
- """
- def __init__(self):
- # URL можно менять, если поменяется хост/порт
- self.base_url = "http://analytics01.stage.lenta.rambler.tech:8080/topics/by_rel_urls"
- def make_request(self, article_url: str):
- """
- Отправляем запрос к приватному API, подставляя rel_urls[]=...
- """
- try:
- response = requests.get(f"{self.base_url}?rel_urls[]={article_url}")
- response.raise_for_status()
- return response.json()
- except HTTPError as e:
- logger.error(f"Ошибка HTTP в приватном API: {e}")
- return None
- except Exception as e:
- logger.error(f"Неизвестная ошибка в приватном API: {e}")
- return None
- def parse_response(self, response):
- """
- Парсим структуру ответа (response — это список объектов).
- Поля могут отличаться в реальном API, убедитесь, что ключи совпадают.
- """
- if not response or not isinstance(response, list) or len(response) == 0:
- return None
- try:
- data = response[0]
- # pub_date приходит в формате ISO8601, пример: '2025-03-01T12:34:56.789Z'
- pub_date = datetime.strptime(data['pub_date'], '%Y-%m-%dT%H:%M:%S.%fZ')
- return {
- 'pub_date': pub_date, # datetime-объект
- 'editor': data.get('editor'),
- 'editor_groups': data.get('editor_groups', []),
- 'author': data.get('author'),
- 'pub_type': data.get('pub_type'),
- 'pub_section': data.get('pub_section'),
- 'pub_subsection': data['pub_subsection'][0] if data.get('pub_subsection') else None,
- # Поля title пока нет — добавите при появлении
- 'title_yandex': data.get('title_yandex'),
- 'pub_id': data.get('pub_id'),
- 'is_super_news': data.get('is_super_news'),
- 'tags_rco': data.get('tags_rco', []),
- # tags_editors — словарь, содержащий tags_editors_true / tags_editors_false
- 'tags_editors_true': data.get('tags_editors', {}).get('tags_editors_true', []),
- 'tags_editors_false': data.get('tags_editors', {}).get('tags_editors_false', []),
- 'bs_editors': data.get('bs_editors'),
- 'bs_rco': data.get('bs_rco')
- }
- except (KeyError, IndexError, ValueError) as e:
- logger.error(f"Ошибка парсинга приватного API: {e}")
- return None
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement