Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import ydb
- import ydb.iam
- import logging
- from time import sleep
- # Set up logging for better debugging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- def get_credentials():
- """
- Retrieve credentials from environment variables or secret management system.
- This avoids hardcoding sensitive information.
- """
- try:
- credentials_path = os.getenv('YDB_CREDENTIALS_PATH') # Store the path in an environment variable
- if not credentials_path:
- raise ValueError("YDB_CREDENTIALS_PATH environment variable not set.")
- return ydb.iam.ServiceAccountCredentials.from_file(credentials_path)
- except Exception as e:
- logger.error(f"Error retrieving credentials: {e}")
- raise
- def execute_query(session):
- """
- Executes a simple query to check the connection and ensure the database is responsive.
- """
- try:
- result = session.transaction().execute(
- "SELECT 1 AS cnt;",
- commit_tx=True,
- settings=ydb.BaseRequestSettings().with_timeout(3).with_operation_timeout(2),
- )
- return result
- except ydb.Error as e:
- logger.error(f"YDB error executing query: {e}")
- raise
- def retry_operation(func, session, retries=3, delay=2):
- """
- Retry logic for executing database operations in case of transient failures.
- """
- for attempt in range(retries):
- try:
- return func(session)
- except Exception as e:
- logger.warning(f"Attempt {attempt + 1} failed: {e}")
- if attempt < retries - 1:
- sleep(delay) # Exponential backoff can be added here
- else:
- logger.error("Max retries reached. Operation failed.")
- raise
- def initialize_driver():
- """
- Initializes the YDB driver with secure credentials and configuration.
- """
- try:
- driver = ydb.Driver(
- endpoint=os.getenv('YDB_ENDPOINT', 'grpcs://ydb.serverless.yandexcloud.net:2135'),
- database=os.getenv('YDB_DATABASE', '/ru-central1/b1g0uddo62cr7ku3vcrs/etn9qo0l4gl6admidgt1'),
- credentials=get_credentials()
- )
- return driver
- except Exception as e:
- logger.error(f"Error initializing YDB driver: {e}")
- raise
- def main():
- """
- Main function to initialize YDB driver, query the database, and handle operations.
- """
- try:
- # Initialize driver and session pool
- driver = initialize_driver()
- with driver:
- driver.wait(fail_fast=True, timeout=5)
- # Use session pool with retry operation
- with ydb.SessionPool(driver) as pool:
- session = pool.acquire()
- if session:
- result = retry_operation(execute_query, session)
- assert result[0].rows[0].cnt == 1
- logger.info("Query executed successfully.")
- else:
- logger.error("Failed to acquire a session.")
- except Exception as e:
- logger.error(f"Error during execution: {e}")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement