Advertisement
bagsari

Untitled

Sep 2nd, 2024
10
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.92 KB | None | 0 0
  1. import functools
  2. from typing import TYPE_CHECKING, Optional
  3.  
  4. from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
  5. from sqlalchemy.sql.expression import func
  6. from sqlalchemy.dialects.postgresql import JSON
  7.  
  8. from app.db.base_class import Base
  9. from Crypto.Cipher import AES
  10. import binascii
  11. import uuid
  12. from sqlalchemy.ext.hybrid import hybrid_property
  13. import hvac
  14.  
  15. if TYPE_CHECKING:
  16. from .store import Store # noqa: F401
  17.  
  18.  
  19. @functools.cache
  20. def read_static_file(file_path: str, mode: str = 'rb') -> bytes:
  21. with open(file_path, mode) as file:
  22. return file.read()
  23.  
  24. @functools.cache
  25. def vault_auth() -> hvac.Client:
  26. client = hvac.Client(
  27. url='http://127.0.0.1:8200',
  28. token='hvs.bpc4mgAwVZ0O95mkalNL05RP',
  29. )
  30. return client
  31.  
  32. @functools.cache
  33. def read_vault(key) -> dict:
  34. client = vault_auth()
  35. read_response = client.secrets.kv.read_secret_version(
  36. path=key)
  37. return read_response
  38.  
  39.  
  40. def aes_encrypt(data: str) -> str:
  41. response = read_vault('CRED_ENCRYPT_KEY')
  42. key = response["data"]["data"]["key"] # Binary data from Vault
  43. nonce = response["data"]["data"]["nounce"] # Binary data from Vault
  44.  
  45. cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
  46. data_padded = data + (" " * (16 - (len(data) % 16))) # Padding to block size
  47. return cipher.encrypt(data_padded.encode("utf-8")).hex()
  48.  
  49.  
  50. def aes_decrypt(data: str) -> str:
  51. response = read_vault()
  52. key = response["data"]["data"]["key"] # Binary data from Vault
  53. nonce = response["data"]["data"]["nonce"] # Binary data from Vault
  54.  
  55. cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
  56. return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()
  57.  
  58.  
  59. class ServiceCred(Base):
  60. id = Column(Integer, primary_key=True, index=True)
  61. service = Column(String(20), index=True)
  62. provider = Column(String(20), index=True, nullable=True)
  63. encrypted_username = Column(String, nullable=True)
  64. encrypted_passcode = Column(String, nullable=True)
  65. store_id = Column(Integer, ForeignKey("store.id"), nullable=True)
  66. logo = Column(String(500), nullable=True)
  67. meta = Column(JSON, nullable=True)
  68. added_on = Column(DateTime(timezone=True), server_default=func.now())
  69. updated_on = Column(DateTime(timezone=True), onupdate=func.now())
  70.  
  71. @hybrid_property
  72. def username(self) -> Optional[str]:
  73. if self.encrypted_username:
  74. return aes_decrypt(self.encrypted_username)
  75. return None
  76.  
  77. @username.setter
  78. def username(self, value: str) -> None:
  79. self.encrypted_username = aes_encrypt(value)
  80.  
  81. @hybrid_property
  82. def passcode(self) -> Optional[str]:
  83. if self.encrypted_passcode:
  84. return aes_decrypt(self.encrypted_passcode)
  85. return None
  86.  
  87. @passcode.setter
  88. def passcode(self, value: str) -> None:
  89. self.encrypted_passcode = aes_encrypt(value)
  90.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement