Advertisement
bagsari

Untitled

Sep 10th, 2024
12
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.27 KB | None | 0 0
  1. import functools
  2. import os
  3. import binascii
  4. import hvac
  5. from app.db.base_class import Base
  6. from fastapi.encoders import jsonable_encoder
  7. from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, DateTime, func
  8. from sqlalchemy.ext.hybrid import hybrid_property, Comparator
  9. from sqlalchemy.orm import Session
  10. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  11. from cryptography.hazmat.backends import default_backend
  12. from sqlalchemy.dialects.postgresql import JSON
  13. from typing import TYPE_CHECKING
  14.  
  15. if TYPE_CHECKING:
  16. from .store import Store # noqa: F401
  17.  
  18.  
  19. def read_static_file(file_path: str, mode: str = 'rb') -> bytes:
  20. with open(file_path, mode) as file:
  21. return file.read()
  22.  
  23.  
  24. @functools.cache
  25. def vault_auth() -> hvac.Client:
  26. """Authenticate with Vault and return a client instance."""
  27. client = hvac.Client(
  28. url=os.getenv('VAULT_URL', 'http://127.0.0.1:8200'),
  29. token=os.getenv('VAULT_TOKEN', 'hvs.H9PkGdUhbKVHBgkYjRzObSRf'),
  30. )
  31. return client
  32.  
  33. def read_vault(key: str) -> dict:
  34. """Read a secret from Vault."""
  35. client = vault_auth()
  36. try:
  37. read_response = client.secrets.kv.read_secret_version(path=key)
  38. return read_response['data']['data']
  39. except hvac.exceptions.InvalidRequest as e:
  40. print(f"Error reading from Vault: {e}")
  41. return {}
  42. except hvac.exceptions.Forbidden as e:
  43. print(f"Permission denied: {e}")
  44. return {}
  45.  
  46. def aes_encrypt(data: str) -> str:
  47. """Encrypt data using AES encryption."""
  48. vault_data = read_vault('CRED_ENCRYPT_KEY')
  49. key = binascii.unhexlify(vault_data['key'])
  50. nonce = binascii.unhexlify(vault_data['nonce'])
  51.  
  52. cipher = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend())
  53. encryptor = cipher.encryptor()
  54.  
  55. encrypted_data = encryptor.update(data.encode()) + encryptor.finalize()
  56. tag = encryptor.tag # Authentication tag
  57.  
  58. # Combine the encrypted data with the tag, so they can be separated later
  59. return binascii.hexlify(encrypted_data + tag).decode()
  60.  
  61. def aes_decrypt(encrypted_data: str) -> str:
  62. """Decrypt data using AES encryption."""
  63. vault_data = read_vault('CRED_ENCRYPT_KEY')
  64. key = binascii.unhexlify(vault_data['key'])
  65. nonce = binascii.unhexlify(vault_data['nonce'])
  66.  
  67. encrypted_data_bytes = binascii.unhexlify(encrypted_data)
  68.  
  69. # Split the encrypted data and the authentication tag
  70. encrypted_data, tag = encrypted_data_bytes[:-16], encrypted_data_bytes[-16:]
  71.  
  72. cipher = Cipher(algorithms.AES(key), modes.GCM(nonce, tag), backend=default_backend())
  73. decryptor = cipher.decryptor()
  74.  
  75. decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
  76. return decrypted_data.decode("utf-8")
  77.  
  78.  
  79. class EncryptComparator(Comparator):
  80. def operate(self, op, other, **kw):
  81. return op(self.__clause_element__(), aes_encrypt(other), **kw)
  82.  
  83.  
  84. class ServiceCred(Base):
  85. id = Column(Integer, primary_key=True, index=True)
  86. service = Column(String(20), index=True)
  87. provider = Column(String(20), index=True, nullable=True)
  88. encrypted_username = Column(String, nullable=True)
  89. encrypted_passcode = Column(String, nullable=True)
  90. store_id = Column(Integer, ForeignKey("store.id"), nullable=True)
  91. logo = Column(String(500), nullable=True)
  92. meta = Column(JSON, nullable=True)
  93. added_on = Column(DateTime(timezone=True), server_default=func.now())
  94. updated_on = Column(DateTime(timezone=True), onupdate=func.now())
  95.  
  96. @property
  97. def username(self):
  98. if self.encrypted_username:
  99. return aes_decrypt(self.encrypted_username)
  100.  
  101. @username.setter
  102. def username(self, value: str) -> None:
  103. self.encrypted_username = aes_encrypt(value)
  104.  
  105. @username.comparator
  106. def username(cls):
  107. return EncryptComparator(cls.encrypted_username)
  108.  
  109. @property
  110. def passcode(self):
  111. if self.encrypted_passcode:
  112. return aes_decrypt(self.encrypted_passcode)
  113.  
  114. @passcode.setter
  115. def passcode(self, value: str) -> None:
  116. self.encrypted_passcode = aes_encrypt(value)
  117.  
  118. @passcode.comparator
  119. def passcode(cls):
  120. return EncryptComparator(cls.encrypted_passcode)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement