Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ### filter_schema.py
- import operator
- from typing import Union, List, Optional, ForwardRef, Callable, Any
- from pydantic import BaseModel, Field, validator
- logics = {
- 'and': operator.and_,
- 'or': operator.or_,
- }
- operators = {
- "=": "exact",
- "like": "icontains",
- "is": "is",
- ">": "gt",
- ">=": "ge",
- "<": "lt",
- "<=": "le",
- "is_null": "isnull",
- }
- class SearchMeta(BaseModel):
- field: str = Field(description='字段名称')
- op: Optional[str] = Field('exact', description='对比符')
- value: Union[bool, str, List] = Field(description='值')
- @validator('op')
- def validate_op(cls, v):
- if v not in operators:
- raise ValueError(f'op is not valid')
- return operators[v]
- class Logic(Callable):
- def __call__(self, a: Any, b: Any) -> Any:
- return operator.and_(a, b)
- @classmethod
- def __get_validators__(cls):
- yield cls.validate
- @classmethod
- def validate(cls, v):
- if not isinstance(v, str):
- raise TypeError('string required')
- if (v := v.lower()) not in logics:
- raise ValueError(f'logic must be in ("and", "or"), got {v!r}')
- return logics[v]
- @classmethod
- def __modify_schema__(cls, field_schema):
- field_schema.update(
- type="string",
- default="and"
- )
- SearchGroup = ForwardRef('SearchGroup')
- class SearchGroup(BaseModel):
- searches: Optional[List[SearchMeta]] = Field(default_factory=list)
- groups: Optional[List[SearchGroup]] = Field(default_factory=list)
- logic: Optional[Logic] = Field(default_factory=Logic)
- SearchGroup.update_forward_refs()
- class OrderingField(BaseModel):
- field: str = Field(description='字段名称')
- order_type: str = Field(description='排序类型(asc, desc)')
- @validator('order_type')
- def validate_order_type(cls, v):
- if v not in {'asc', 'desc'}:
- raise ValueError(f'order_type should be one of "asc", "desc", got {v!r}')
- return v
- class Filter(SearchGroup):
- ordering: Optional[List[OrderingField]] = Field(default_factory=list)
- per_page: int = Field(100, title='每页数量', gt=0)
- page: int = Field(1, title='页数', gt=0)
- ### filters.py
- import operator
- from functools import reduce
- from django.db.models import Q
- from rest_framework import filters
- from utils.filter_schema import SearchGroup, SearchMeta
- def parse_q(search: SearchMeta):
- key = f'{search.field}__{search.op}'
- return {key: search.value}
- def parse_q_search_group(search_group: SearchGroup):
- q = reduce(search_group.logic, (Q(**parse_q(search)) for search in search_group.searches), Q())
- q = reduce(search_group.logic, (parse_q_search_group(sub_group) for sub_group in search_group.groups), q)
- return q
- class BFFilterBackEnd(filters.BaseFilterBackend):
- def filter_queryset(self, request, queryset, view):
- data = request.data
- if not data:
- return queryset
- search_group = SearchGroup.validate(data)
- # TODO query_params use another filter backend
- query_params = request.query_params
- params_q = reduce(
- operator.and_,
- (reduce(
- operator.or_,
- (Q(**{k: v}) for v in query_params.getlist(k))) for k in query_params)
- ) if query_params else Q()
- q = parse_q_search_group(search_group)
- return queryset.filter(params_q & q).distinct()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement