Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import inspect
- import re
- import html
- from functools import wraps
- from typing import Callable, Any, Dict, Tuple, List, Optional, Union
- from pydantic import BaseModel
- def escape_string_fields(escape_method: Union[str, Callable[[str], str]] = "re"):
- def get_escape_func() -> Callable[[str], str]:
- if callable(escape_method):
- return escape_method
- elif escape_method == "re":
- return re.escape
- elif escape_method == "html":
- return html.escape
- else:
- raise ValueError(f"Unknown escape method: {escape_method}")
- escape_func = get_escape_func()
- def decorator(func):
- is_async = inspect.iscoroutinefunction(func)
- if is_async:
- @wraps(func)
- async def wrapper(*args, **kwargs):
- processed_args, processed_kwargs = process_args_kwargs(args, kwargs, escape_func)
- return await func(*processed_args, **processed_kwargs)
- else:
- @wraps(func)
- def wrapper(*args, **kwargs):
- processed_args, processed_kwargs = process_args_kwargs(args, kwargs, escape_func)
- return func(*processed_args, **processed_kwargs)
- return wrapper
- return decorator
- def process_args_kwargs(args: Tuple, kwargs: Dict[str, Any], escape_func: Callable[[str], str]) -> Tuple[List, Dict]:
- processed_args = []
- for arg in args:
- if isinstance(arg, BaseModel):
- processed_args.append(escape_model_strings(arg, escape_func))
- else:
- processed_args.append(arg)
- processed_kwargs = {}
- for key, value in kwargs.items():
- if isinstance(value, BaseModel):
- processed_kwargs[key] = escape_model_strings(value, escape_func)
- else:
- processed_kwargs[key] = value
- return processed_args, processed_kwargs
- def escape_model_strings(model: BaseModel, escape_func: Callable[[str], str]) -> BaseModel:
- try:
- model_dict = model.model_dump()
- except AttributeError:
- model_dict = model.dict()
- for field_name, field_value in model_dict.items():
- if isinstance(field_value, str):
- model_dict[field_name] = escape_func(field_value)
- return type(model)(**model_dict)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement