Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class BaseModelView(views.MethodView):
- """
- Base class for all ModelView classes.
- Attributes:
- item_type (Type[Item]): The type of the item that this view handles.
- """
- def __init__(self, item_type: Type[Item]):
- self.item_type = item_type
- self.api_key = ""
- self.chat_api = None
- self.data = None
- @classmethod
- def as_view(cls, name, *class_args, **class_kwargs):
- view = super().as_view(name, *class_args, **class_kwargs)
- view.model = cls(*class_args, **class_kwargs)
- return view
- async def dispatch_request(self, **kwargs: Any) -> ResponseReturnValue:
- openai_api_key = request.headers.pop(
- "Authorization", "Invalid Authorization"
- ).split(" ")[1]
- story_generator = ChatAPI(ai_prefix="Betty", openai_api_key=openai_api_key)
- self.api_key = openai_api_key
- self.chat_api = story_generator
- self.data = await request.get_json()
- return await super().dispatch_request(**kwargs)
- class StreamItemsView(BaseModelView):
- async def handle_stream_request(self):
- def stream(*args, **kwargs):
- queue = Queue()
- job_done = object()
- queue.put(job_done)
- chat_thread = Thread(
- target=asyncio.run,
- args=(self.chat_api.stream(*args, queue=queue, **kwargs),),
- )
- chat_thread.start()
- while True:
- try:
- next_obj = queue.get(True, timeout=1)
- if next_obj is job_done:
- if not chat_thread.is_alive():
- break
- continue
- json_obj = json.dumps(asdict(next_obj))
- byte_array = bytearray(f"{json_obj}\n", encoding="utf-8")
- print(
- f"{time.time()} - yield from {type(byte_array)} from stream()"
- )
- yield from byte_array
- except Empty:
- if not chat_thread.is_alive():
- break
- continue
- item_request = (
- self.item_type.get_completion_request_model().parse_obj(self.data).dict()
- )
- async_gen = stream(self.item_type, **item_request)
- print(f"{time.time()} - yield {type(async_gen)} from handle_stream_request()")
- yield async_gen
- async def post(self):
- async_gen = self.handle_stream_request()
- print(f"{time.time()} - return {type(async_gen)} from post()")
- return async_gen
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement