Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from dataclasses import asdict
- import json
- from typing import Type
- from quart import jsonify, request, views
- from betty.chat.api import ChatAPI
- from betty.types import Item
- class BaseModelView(views.MethodView):
- def __init__(self, item_type: Type[Item]):
- self.item_type = item_type
- @classmethod
- def as_view(cls, name, *class_args, **class_kwargs):
- view = super().as_view(name, *class_args, **class_kwargs)
- view.model = cls(*class_args, **class_kwargs)
- return view
- class StreamItemsView(BaseModelView):
- async def handle_stream_request(self):
- openai_api_key = request.headers.pop(
- "Authorization", "Invalid Authorization"
- ).split(" ")[1]
- story_generator = ChatAPI(ai_prefix="Betty", openai_api_key=openai_api_key)
- # implemented similarly to https://gist.github.com/mortymike/70711b028311681e5f3c6511031d5d43#solution
- # `story_generator.stream` puts items into `queue` as they are streamed from the model
- async def stream(*args, **kwargs):
- queue = Queue()
- job_done = object()
- queue.put(job_done)
- t = Thread(target=asyncio.run, args=(story_generator.stream(*args, queue=queue, **kwargs),))
- t.start()
- while True:
- try:
- next_obj = queue.get(True, timeout=1)
- if next_obj is job_done:
- if not t.is_alive():
- break
- continue
- json_obj = json.dumps(asdict(next_obj))
- yield f"{json_obj}\n"
- except Empty:
- if not t.is_alive():
- break
- continue
- data = await request.get_json()
- item_request = (
- self.item_type.get_completion_request_model().parse_obj(data).dict()
- )
- return stream(self.item_type, **item_request)
- async def post(self):
- return self.handle_stream_request()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement