Advertisement
Pandaaaa906

Untitled

Mar 10th, 2022
909
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.52 KB | None | 0 0
  1. ### filter_schema.py
  2.  
  3. import operator
  4. from typing import Union, List, Optional, ForwardRef, Callable, Any
  5.  
  6. from pydantic import BaseModel, Field, validator
  7.  
  8.  
  9. logics = {
  10.     'and': operator.and_,
  11.     'or': operator.or_,
  12. }
  13.  
  14.  
  15. operators = {
  16.     "=": "exact",
  17.     "like": "icontains",
  18.     "is": "is",
  19.     ">": "gt",
  20.     ">=": "ge",
  21.     "<": "lt",
  22.     "<=": "le",
  23.     "is_null": "isnull",
  24. }
  25.  
  26.  
  27. class SearchMeta(BaseModel):
  28.     field: str = Field(description='字段名称')
  29.     op: Optional[str] = Field('exact', description='对比符')
  30.     value: Union[bool, str, List] = Field(description='值')
  31.  
  32.     @validator('op')
  33.     def validate_op(cls, v):
  34.         if v not in operators:
  35.             raise ValueError(f'op is not valid')
  36.         return operators[v]
  37.  
  38.  
  39. class Logic(Callable):
  40.     def __call__(self, a: Any, b: Any) -> Any:
  41.         return operator.and_(a, b)
  42.  
  43.     @classmethod
  44.     def __get_validators__(cls):
  45.         yield cls.validate
  46.  
  47.     @classmethod
  48.     def validate(cls, v):
  49.         if not isinstance(v, str):
  50.             raise TypeError('string required')
  51.         if (v := v.lower()) not in logics:
  52.             raise ValueError(f'logic must be in ("and", "or"), got {v!r}')
  53.         return logics[v]
  54.  
  55.     @classmethod
  56.     def __modify_schema__(cls, field_schema):
  57.         field_schema.update(
  58.             type="string",
  59.             default="and"
  60.         )
  61.  
  62.  
  63. SearchGroup = ForwardRef('SearchGroup')
  64.  
  65.  
  66. class SearchGroup(BaseModel):
  67.     searches: Optional[List[SearchMeta]] = Field(default_factory=list)
  68.     groups: Optional[List[SearchGroup]] = Field(default_factory=list)
  69.     logic: Optional[Logic] = Field(default_factory=Logic)
  70.  
  71.  
  72. SearchGroup.update_forward_refs()
  73.  
  74.  
  75. class OrderingField(BaseModel):
  76.     field: str = Field(description='字段名称')
  77.     order_type: str = Field(description='排序类型(asc, desc)')
  78.  
  79.     @validator('order_type')
  80.     def validate_order_type(cls, v):
  81.         if v not in {'asc', 'desc'}:
  82.             raise ValueError(f'order_type should be one of "asc", "desc", got {v!r}')
  83.         return v
  84.  
  85.  
  86. class Filter(SearchGroup):
  87.     ordering: Optional[List[OrderingField]] = Field(default_factory=list)
  88.     per_page: int = Field(100, title='每页数量', gt=0)
  89.     page: int = Field(1, title='页数', gt=0)
  90.  
  91.  
  92.  
  93. ### filters.py
  94.  
  95.  
  96. import operator
  97. from functools import reduce
  98.  
  99. from django.db.models import Q
  100. from rest_framework import filters
  101.  
  102. from utils.filter_schema import SearchGroup, SearchMeta
  103.  
  104.  
  105. def parse_q(search: SearchMeta):
  106.     key = f'{search.field}__{search.op}'
  107.     return {key: search.value}
  108.  
  109.  
  110. def parse_q_search_group(search_group: SearchGroup):
  111.     q = reduce(search_group.logic, (Q(**parse_q(search)) for search in search_group.searches), Q())
  112.     q = reduce(search_group.logic, (parse_q_search_group(sub_group) for sub_group in search_group.groups), q)
  113.     return q
  114.  
  115.  
  116. class BFFilterBackEnd(filters.BaseFilterBackend):
  117.     def filter_queryset(self, request, queryset, view):
  118.         data = request.data
  119.         if not data:
  120.             return queryset
  121.         search_group = SearchGroup.validate(data)
  122.         # TODO query_params use another filter backend
  123.         query_params = request.query_params
  124.         params_q = reduce(
  125.             operator.and_,
  126.             (reduce(
  127.                 operator.or_,
  128.                 (Q(**{k: v}) for v in query_params.getlist(k))) for k in query_params)
  129.         ) if query_params else Q()
  130.         q = parse_q_search_group(search_group)
  131.         return queryset.filter(params_q & q).distinct()
  132.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement