Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import functools
- import os
- import binascii
- import hvac
- from app.db.base_class import Base
- from fastapi.encoders import jsonable_encoder
- from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, DateTime, func
- from sqlalchemy.ext.hybrid import hybrid_property, Comparator
- from sqlalchemy.orm import Session
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from cryptography.hazmat.backends import default_backend
- from sqlalchemy.dialects.postgresql import JSON
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from .store import Store # noqa: F401
- def read_static_file(file_path: str, mode: str = 'rb') -> bytes:
- with open(file_path, mode) as file:
- return file.read()
- @functools.cache
- def vault_auth() -> hvac.Client:
- """Authenticate with Vault and return a client instance."""
- client = hvac.Client(
- url=os.getenv('VAULT_URL', 'http://127.0.0.1:8200'),
- token=os.getenv('VAULT_TOKEN', 'hvs.H9PkGdUhbKVHBgkYjRzObSRf'),
- )
- return client
- def read_vault(key: str) -> dict:
- """Read a secret from Vault."""
- client = vault_auth()
- try:
- read_response = client.secrets.kv.read_secret_version(path=key)
- return read_response['data']['data']
- except hvac.exceptions.InvalidRequest as e:
- print(f"Error reading from Vault: {e}")
- return {}
- except hvac.exceptions.Forbidden as e:
- print(f"Permission denied: {e}")
- return {}
- def aes_encrypt(data: str) -> str:
- """Encrypt data using AES encryption."""
- vault_data = read_vault('CRED_ENCRYPT_KEY')
- key = binascii.unhexlify(vault_data['key'])
- nonce = binascii.unhexlify(vault_data['nonce'])
- cipher = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend())
- encryptor = cipher.encryptor()
- encrypted_data = encryptor.update(data.encode()) + encryptor.finalize()
- tag = encryptor.tag # Authentication tag
- # Combine the encrypted data with the tag, so they can be separated later
- return binascii.hexlify(encrypted_data + tag).decode()
- def aes_decrypt(encrypted_data: str) -> str:
- """Decrypt data using AES encryption."""
- vault_data = read_vault('CRED_ENCRYPT_KEY')
- key = binascii.unhexlify(vault_data['key'])
- nonce = binascii.unhexlify(vault_data['nonce'])
- encrypted_data_bytes = binascii.unhexlify(encrypted_data)
- # Split the encrypted data and the authentication tag
- encrypted_data, tag = encrypted_data_bytes[:-16], encrypted_data_bytes[-16:]
- cipher = Cipher(algorithms.AES(key), modes.GCM(nonce, tag), backend=default_backend())
- decryptor = cipher.decryptor()
- decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
- return decrypted_data.decode("utf-8")
- class EncryptComparator(Comparator):
- def operate(self, op, other, **kw):
- return op(self.__clause_element__(), aes_encrypt(other), **kw)
- class ServiceCred(Base):
- id = Column(Integer, primary_key=True, index=True)
- service = Column(String(20), index=True)
- provider = Column(String(20), index=True, nullable=True)
- encrypted_username = Column(String, nullable=True)
- encrypted_passcode = Column(String, nullable=True)
- store_id = Column(Integer, ForeignKey("store.id"), nullable=True)
- logo = Column(String(500), nullable=True)
- meta = Column(JSON, nullable=True)
- added_on = Column(DateTime(timezone=True), server_default=func.now())
- updated_on = Column(DateTime(timezone=True), onupdate=func.now())
- @property
- def username(self):
- if self.encrypted_username:
- return aes_decrypt(self.encrypted_username)
- @username.setter
- def username(self, value: str) -> None:
- self.encrypted_username = aes_encrypt(value)
- @username.comparator
- def username(cls):
- return EncryptComparator(cls.encrypted_username)
- @property
- def passcode(self):
- if self.encrypted_passcode:
- return aes_decrypt(self.encrypted_passcode)
- @passcode.setter
- def passcode(self, value: str) -> None:
- self.encrypted_passcode = aes_encrypt(value)
- @passcode.comparator
- def passcode(cls):
- return EncryptComparator(cls.encrypted_passcode)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement