Advertisement
xosski

Yandex database

Dec 4th, 2024
17
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.21 KB | None | 0 0
  1. import os
  2. import ydb
  3. import ydb.iam
  4. import logging
  5. from time import sleep
  6.  
  7. # Set up logging for better debugging
  8. logging.basicConfig(level=logging.INFO)
  9. logger = logging.getLogger(__name__)
  10.  
  11. def get_credentials():
  12. """
  13. Retrieve credentials from environment variables or secret management system.
  14. This avoids hardcoding sensitive information.
  15. """
  16. try:
  17. credentials_path = os.getenv('YDB_CREDENTIALS_PATH') # Store the path in an environment variable
  18. if not credentials_path:
  19. raise ValueError("YDB_CREDENTIALS_PATH environment variable not set.")
  20. return ydb.iam.ServiceAccountCredentials.from_file(credentials_path)
  21. except Exception as e:
  22. logger.error(f"Error retrieving credentials: {e}")
  23. raise
  24.  
  25. def execute_query(session):
  26. """
  27. Executes a simple query to check the connection and ensure the database is responsive.
  28. """
  29. try:
  30. result = session.transaction().execute(
  31. "SELECT 1 AS cnt;",
  32. commit_tx=True,
  33. settings=ydb.BaseRequestSettings().with_timeout(3).with_operation_timeout(2),
  34. )
  35. return result
  36. except ydb.Error as e:
  37. logger.error(f"YDB error executing query: {e}")
  38. raise
  39.  
  40. def retry_operation(func, session, retries=3, delay=2):
  41. """
  42. Retry logic for executing database operations in case of transient failures.
  43. """
  44. for attempt in range(retries):
  45. try:
  46. return func(session)
  47. except Exception as e:
  48. logger.warning(f"Attempt {attempt + 1} failed: {e}")
  49. if attempt < retries - 1:
  50. sleep(delay) # Exponential backoff can be added here
  51. else:
  52. logger.error("Max retries reached. Operation failed.")
  53. raise
  54.  
  55. def initialize_driver():
  56. """
  57. Initializes the YDB driver with secure credentials and configuration.
  58. """
  59. try:
  60. driver = ydb.Driver(
  61. endpoint=os.getenv('YDB_ENDPOINT', 'grpcs://ydb.serverless.yandexcloud.net:2135'),
  62. database=os.getenv('YDB_DATABASE', '/ru-central1/b1g0uddo62cr7ku3vcrs/etn9qo0l4gl6admidgt1'),
  63. credentials=get_credentials()
  64. )
  65. return driver
  66. except Exception as e:
  67. logger.error(f"Error initializing YDB driver: {e}")
  68. raise
  69.  
  70. def main():
  71. """
  72. Main function to initialize YDB driver, query the database, and handle operations.
  73. """
  74. try:
  75. # Initialize driver and session pool
  76. driver = initialize_driver()
  77.  
  78. with driver:
  79. driver.wait(fail_fast=True, timeout=5)
  80.  
  81. # Use session pool with retry operation
  82. with ydb.SessionPool(driver) as pool:
  83. session = pool.acquire()
  84. if session:
  85. result = retry_operation(execute_query, session)
  86. assert result[0].rows[0].cnt == 1
  87. logger.info("Query executed successfully.")
  88. else:
  89. logger.error("Failed to acquire a session.")
  90.  
  91. except Exception as e:
  92. logger.error(f"Error during execution: {e}")
  93.  
  94. if __name__ == "__main__":
  95. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement