Advertisement
ksen145

escape decorator

Mar 16th, 2025
190
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.33 KB | None | 0 0
  1. import inspect
  2. import re
  3. import html
  4. from functools import wraps
  5. from typing import Callable, Any, Dict, Tuple, List, Optional, Union
  6.  
  7. from pydantic import BaseModel
  8.  
  9.  
  10. def escape_string_fields(escape_method: Union[str, Callable[[str], str]] = "re"):
  11.     def get_escape_func() -> Callable[[str], str]:
  12.         if callable(escape_method):
  13.             return escape_method
  14.         elif escape_method == "re":
  15.             return re.escape
  16.         elif escape_method == "html":
  17.             return html.escape
  18.         else:
  19.             raise ValueError(f"Unknown escape method: {escape_method}")
  20.    
  21.     escape_func = get_escape_func()
  22.    
  23.     def decorator(func):
  24.         is_async = inspect.iscoroutinefunction(func)
  25.        
  26.         if is_async:
  27.             @wraps(func)
  28.             async def wrapper(*args, **kwargs):
  29.                 processed_args, processed_kwargs = process_args_kwargs(args, kwargs, escape_func)
  30.                 return await func(*processed_args, **processed_kwargs)
  31.         else:
  32.             @wraps(func)
  33.             def wrapper(*args, **kwargs):
  34.                 processed_args, processed_kwargs = process_args_kwargs(args, kwargs, escape_func)
  35.                 return func(*processed_args, **processed_kwargs)
  36.        
  37.         return wrapper
  38.    
  39.     return decorator
  40.  
  41.  
  42. def process_args_kwargs(args: Tuple, kwargs: Dict[str, Any], escape_func: Callable[[str], str]) -> Tuple[List, Dict]:
  43.     processed_args = []
  44.     for arg in args:
  45.         if isinstance(arg, BaseModel):
  46.             processed_args.append(escape_model_strings(arg, escape_func))
  47.         else:
  48.             processed_args.append(arg)
  49.    
  50.     processed_kwargs = {}
  51.     for key, value in kwargs.items():
  52.         if isinstance(value, BaseModel):
  53.             processed_kwargs[key] = escape_model_strings(value, escape_func)
  54.         else:
  55.             processed_kwargs[key] = value
  56.    
  57.     return processed_args, processed_kwargs
  58.  
  59.  
  60. def escape_model_strings(model: BaseModel, escape_func: Callable[[str], str]) -> BaseModel:
  61.     try:
  62.         model_dict = model.model_dump()
  63.     except AttributeError:
  64.         model_dict = model.dict()
  65.    
  66.     for field_name, field_value in model_dict.items():
  67.         if isinstance(field_value, str):
  68.             model_dict[field_name] = escape_func(field_value)
  69.    
  70.     return type(model)(**model_dict)
  71.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement