Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- from bs4 import BeautifulSoup
- import pandas as pd
- import matplotlib.pyplot as plt
- import sqlite3
- import datetime
- import logging
- from concurrent.futures import ThreadPoolExecutor
- import argparse
- from requests.adapters import HTTPAdapter
- from requests.packages.urllib3.util.retry import Retry
- import json
- import os
- # Setup logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # Constants
- URL = "https://mimer.svk.se/PrimaryRegulation/PrimaryRegulationIndex"
- HEADERS = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
- }
- HEADER_MAPPING_FILE = "header_mappings.json"
- HEADER_HISTORY_FILE = "header_history.json"
- # Default expected headers
- DEFAULT_EXPECTED_HEADERS = {
- "date and time": "Date and Time",
- "fcr-n price (eur/mw)": "FCR-N Price (EUR/MW)",
- "fcr-d up price (eur/mw)": "FCR-D Up Price (EUR/MW)",
- "fcr-d down price (eur/mw)": "FCR-D Down Price (EUR/MW)"
- }
- def load_json_file(file_path, default_data):
- if os.path.exists(file_path):
- with open(file_path, "r") as file:
- return json.load(file)
- return default_data
- def save_json_file(file_path, data):
- with open(file_path, "w") as file:
- json.dump(data, file, indent=4)
- def requests_retry_session(
- retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), session=None
- ):
- session = session or requests.Session()
- retry = Retry(
- total=retries,
- read=retries,
- connect=retries,
- backoff_factor=backoff_factor,
- status_forcelist=status_forcelist,
- )
- adapter = HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
- return session
- def fetch_webpage(url, headers):
- try:
- logging.info(f"Fetching webpage: {url}")
- response = requests_retry_session().get(url, headers=headers)
- response.raise_for_status()
- logging.info(f"Successfully fetched webpage: {url}")
- return response
- except requests.exceptions.RequestException as e:
- logging.error(f"Error fetching the webpage: {e}")
- raise
- def parse_html(content):
- logging.info("Parsing HTML content.")
- soup = BeautifulSoup(content, "html.parser")
- table = soup.find("table")
- if not table:
- logging.error("Table not found on the webpage.")
- raise ValueError("Table not found on the webpage.")
- logging.info("Table found and parsed successfully.")
- return table
- def get_column_indices(table, header_mappings, header_history):
- logging.info("Identifying column indices based on headers.")
- headers = table.find_all("th")
- header_texts = [header.text.strip().lower() for header in headers]
- logging.info(f"Actual headers found: {header_texts}")
- indices = {}
- for header in header_texts:
- if header in header_mappings:
- indices[header] = header_texts.index(header)
- else:
- logging.info(f"New header '{header}' found. Adding to header mappings and history.")
- display_name = header.replace("_", " ").title()
- header_mappings[header] = display_name
- indices[header] = header_texts.index(header)
- if header not in header_history:
- header_history[header] = {"first_detected": datetime.datetime.now().isoformat(), "last_detected": datetime.datetime.now().isoformat()}
- else:
- header_history[header]["last_detected"] = datetime.datetime.now().isoformat()
- save_json_file(HEADER_MAPPING_FILE, header_mappings)
- save_json_file(HEADER_HISTORY_FILE, header_history)
- return indices
- def extract_data_for_date(table, date, indices):
- logging.info(f"Extracting data for date: {date}")
- data = []
- date_str = date.strftime("%Y-%m-%d")
- for row in table.find_all("tr")[1:]:
- cells = row.find_all("td")
- if len(cells) <= max(indices.values()):
- logging.warning(f"Unexpected table structure on date {date_str}, skipping row.")
- continue
- date_time = cells[indices["date and time"]].text.strip()
- if date_time.startswith(date_str):
- try:
- fcr_n_price = float(cells[indices["fcr-n price (eur/mw)"]].text.strip().replace(",", ".")) if "fcr-n price (eur/mw)" in indices else None
- fcr_d_up_price = float(cells[indices["fcr-d up price (eur/mw)"]].text.strip().replace(",", ".")) if "fcr-d up price (eur/mw)" in indices else None
- fcr_d_down_price = float(cells[indices["fcr-d down price (eur/mw)"]].text.strip().replace(",", ".")) if "fcr-d down price (eur/mw)" in indices else None
- row_data = [date_time, fcr_n_price, fcr_d_up_price, fcr_d_down_price]
- for header, index in indices.items():
- if header not in DEFAULT_EXPECTED_HEADERS:
- column_value = cells[index].text.strip().replace(",", ".")
- row_data.append(float(column_value) if column_value else None)
- data.append(row_data)
- except (IndexError, ValueError) as e:
- logging.warning(f"Error parsing row data for date {date_str}: {e}")
- continue
- logging.info(f"Data extraction for date {date} completed with {len(data)} records.")
- return data
- def extract_data(start_date, end_date):
- try:
- response = fetch_webpage(URL, HEADERS)
- table = parse_html(response.content)
- header_mappings = load_json_file(HEADER_MAPPING_FILE, DEFAULT_EXPECTED_HEADERS)
- header_history = load_json_file(HEADER_HISTORY_FILE, {})
- indices = get_column_indices(table, header_mappings, header_history)
- except Exception as e:
- logging.error(f"Error during data extraction: {e}")
- return pd.DataFrame()
- data = []
- with ThreadPoolExecutor() as executor:
- futures = [
- executor.submit(extract_data_for_date, table, current_date, indices)
- for current_date in pd.date_range(start_date, end_date)
- ]
- for future in futures:
- try:
- result = future.result()
- data.extend(result)
- logging.info(f"Data extracted for current date range segment with {len(result)} records.")
- except Exception as e:
- logging.warning(f"Error processing data for a date range: {e}")
- all_headers = [header_mappings[header] for header in header_mappings]
- df = pd.DataFrame(data, columns=all_headers)
- logging.info(f"Total records extracted: {len(df)}")
- return df
- def validate_and_clean_data(df):
- if df.empty:
- logging.warning("No data to validate and clean.")
- return df
- logging.info("Validating and cleaning data.")
- initial_length = len(df)
- df.dropna(inplace=True)
- for column in df.columns[1:]:
- Q1 = df[column].quantile(0.25)
- Q3 = df[column].quantile(0.75)
- IQR = Q3 - Q1
- lower_bound = Q1 - 1.5 * IQR
- upper_bound = Q3 + 1.5 * IQR
- df = df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
- logging.info(f"Data cleaned: removed {initial_length - len(df)} outliers and missing values.")
- return df
- def analyze_and_visualize_data(df):
- if df.empty:
- logging.warning("No data available for analysis and visualization.")
- return
- logging.info("Analyzing and visualizing data.")
- summary = df.describe()
- logging.info(f"Data summary:\n{summary}")
- df.set_index("Date and Time", inplace=True)
- df.plot(subplots=True, figsize=(10, 8))
- plt.tight_layout()
- plt.show()
- def save_to_database(df, db_filename):
- if df.empty:
- logging.warning("No data available to save to database.")
- return
- logging.info(f"Saving data to database: {db_filename}")
- conn = sqlite3.connect(db_filename)
- df.to_sql("primary_regulation", conn, if_exists="replace", index=False)
- conn.close()
- logging.info("Data saved to database successfully.")
- def main(start_date, end_date, db_filename):
- try:
- df = extract_data(start_date, end_date)
- df = validate_and_clean_data(df)
- analyze_and_visualize_data(df)
- save_to_database(df, db_filename)
- except Exception as e:
- logging.error(f"An error occurred: {e}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Fetch and save primary regulation data.')
- parser.add_argument('--start-date', type=str, required=True, help='Start date in YYYY-MM-DD format')
- parser.add_argument('--end-date', type=str, required=True, help='End date in YYYY-MM-DD format')
- parser.add_argument('--db-filename', type=str, default="primary_regulation_data.db", help='Output SQLite database filename')
- args = parser.parse_args()
- start_date = datetime.datetime.strptime(args.start_date, "%Y-%m-%d").date()
- end_date = datetime.datetime.strptime(args.end_date, "%Y-%m-%d").date()
- main(start_date, end_date, args.db_filename)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement