Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aiohttp
- import asyncio
- import json
- import logging
- import sys
- from datetime import datetime, timedelta
- from cachetools import TTLCache
- from typing import Optional
- import os
- import structlog
- from ratelimit import limits, sleep_and_retry
- LOG_FILENAME = os.getenv('LOG_FILENAME', 'sec_cik_retriever.log')
- CACHE_FILENAME = os.getenv('CACHE_FILENAME', 'cik_cache.json')
- CIK_LOOKUP_URL = 'https://www.sec.gov/cgi-bin/browse-edgar?CIK={ticker}&find=Search&owner=exclude&action=getcompany'
- SEC_URL_TEMPLATE = 'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
- CACHE_VALIDITY_DAYS = int(os.getenv('CACHE_VALIDITY_DAYS', '30'))
- ONE_MINUTE = 60
- structlog.configure(
- processors=[structlog.processors.JSONRenderer()],
- context_class=dict,
- logger_factory=structlog.stdlib.LoggerFactory(),
- wrapper_class=structlog.stdlib.BoundLogger,
- cache_logger_on_first_use=True,
- )
- logger = structlog.get_logger()
- cache = TTLCache(maxsize=1000, ttl=CACHE_VALIDITY_DAYS * 24 * 3600)
- class SessionManager:
- def __init__(self):
- self.session = None
- async def __aenter__(self):
- if self.session is None:
- self.session = aiohttp.ClientSession()
- return self.session
- async def __aexit__(self, exc_type, exc, tb):
- if self.session:
- await self.session.close()
- session_manager = SessionManager()
- @sleep_and_retry
- @limits(calls=15, period=ONE_MINUTE)
- async def get_cik(ticker: str) -> Optional[str]:
- async with session_manager as session:
- try:
- async with session.get(CIK_LOOKUP_URL.format(ticker=ticker), timeout=10) as response:
- response.raise_for_status()
- text = await response.text()
- cik_number = parse_cik_from_response(text)
- cache[ticker] = cik_number
- return cik_number
- except aiohttp.ClientError as e:
- logger.error('Client error', ticker=ticker, error=str(e))
- except asyncio.TimeoutError as e:
- logger.error('Timeout error', ticker=ticker, error=str(e))
- except Exception as e:
- logger.error('Unexpected error', ticker=ticker, error=str(e))
- return None
- def parse_cik_from_response(text: str) -> str:
- cik_search_text = 'Central Index Key: '
- start_index = text.find(cik_search_text) + len(cik_search_text)
- end_index = start_index + 10
- return text[start_index:end_index].strip()
- def generate_sec_url(cik: str) -> str:
- return SEC_URL_TEMPLATE.format(cik=cik)
- async def save_cache_to_disk():
- with open(CACHE_FILENAME, 'w') as file:
- json.dump(dict(cache), file)
- async def schedule_cache_save(interval: int):
- while True:
- await asyncio.sleep(interval)
- await save_cache_to_disk()
- async def main():
- asyncio.create_task(schedule_cache_save(60 * 60)) # Save cache every hour
- tickers = sys.argv[1:] if len(sys.argv) > 1 else [input('Enter ticker symbol: ').strip()]
- for ticker in tickers:
- cik_number = await get_cik(ticker)
- if cik_number:
- sec_url = generate_sec_url(cik_number)
- print(f'SEC EDGAR URL for {ticker}: {sec_url}')
- else:
- print(f'Failed to generate SEC EDGAR URL for {ticker}')
- if __name__ == '__main__':
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement