Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import functools
- from typing import TYPE_CHECKING, Optional
- from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
- from sqlalchemy.sql.expression import func
- from sqlalchemy.dialects.postgresql import JSON
- from app.db.base_class import Base
- from Crypto.Cipher import AES
- import binascii
- import uuid
- from sqlalchemy.ext.hybrid import hybrid_property
- import hvac
- if TYPE_CHECKING:
- from .store import Store # noqa: F401
- @functools.cache
- def read_static_file(file_path: str, mode: str = 'rb') -> bytes:
- with open(file_path, mode) as file:
- return file.read()
- @functools.cache
- def vault_auth() -> hvac.Client:
- client = hvac.Client(
- url='http://127.0.0.1:8200',
- token='hvs.bpc4mgAwVZ0O95mkalNL05RP',
- )
- return client
- @functools.cache
- def read_vault(key) -> dict:
- client = vault_auth()
- read_response = client.secrets.kv.read_secret_version(
- path=key)
- return read_response
- def aes_encrypt(data: str) -> str:
- response = read_vault('CRED_ENCRYPT_KEY')
- key = response["data"]["data"]["key"] # Binary data from Vault
- nonce = response["data"]["data"]["nounce"] # Binary data from Vault
- cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
- data_padded = data + (" " * (16 - (len(data) % 16))) # Padding to block size
- return cipher.encrypt(data_padded.encode("utf-8")).hex()
- def aes_decrypt(data: str) -> str:
- response = read_vault()
- key = response["data"]["data"]["key"] # Binary data from Vault
- nonce = response["data"]["data"]["nonce"] # Binary data from Vault
- cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
- return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()
- class ServiceCred(Base):
- id = Column(Integer, primary_key=True, index=True)
- service = Column(String(20), index=True)
- provider = Column(String(20), index=True, nullable=True)
- encrypted_username = Column(String, nullable=True)
- encrypted_passcode = Column(String, nullable=True)
- store_id = Column(Integer, ForeignKey("store.id"), nullable=True)
- logo = Column(String(500), nullable=True)
- meta = Column(JSON, nullable=True)
- added_on = Column(DateTime(timezone=True), server_default=func.now())
- updated_on = Column(DateTime(timezone=True), onupdate=func.now())
- @hybrid_property
- def username(self) -> Optional[str]:
- if self.encrypted_username:
- return aes_decrypt(self.encrypted_username)
- return None
- @username.setter
- def username(self, value: str) -> None:
- self.encrypted_username = aes_encrypt(value)
- @hybrid_property
- def passcode(self) -> Optional[str]:
- if self.encrypted_passcode:
- return aes_decrypt(self.encrypted_passcode)
- return None
- @passcode.setter
- def passcode(self, value: str) -> None:
- self.encrypted_passcode = aes_encrypt(value)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement