Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #modules >> silence_timer
- import asyncio
- import random
- from typing import Callable, Dict
- from loguru import logger
- from voicex.models.data_models.silence_timer import (
- SilenceTypes,
- TimerState,
- SilenceTimerConfig,
- config as default_config
- )
- from copy import deepcopy
- from voicex.models.data_models.silence_timer import Silence_Types
- from voicex.models.data_models.silence_timer import LLMMessage
- from voicex.models.data_models.silence_timer import Config
- config = Config()
- class SilenceTimer:
- def __init__(
- self,
- agent,
- get_ctx: Callable,
- participant,
- metadata,
- config: SilenceTimerConfig = default_config,
- ):
- self.timers: Dict[SilenceTypes, TimerState] = {
- SilenceTypes.CHECK_CUSTOMER: TimerState(
- timeout=config.check_customer_timeout
- ),
- SilenceTypes.HANG_UP: TimerState(
- timeout=config.hangup_timeout
- ),
- }
- self.agent = agent
- self.get_ctx = get_ctx
- self.participant = participant
- self.metadata = metadata
- self.config = config
- self.hangup_in_progress = False
- async def _timer(self, silence_type: SilenceTypes):
- try:
- remaining_time = self.timers[silence_type]["timeout"]
- while remaining_time > 0 and self.timers[silence_type]["is_running"]:
- if not self.timers[silence_type]["is_paused"]:
- await asyncio.sleep(0.1) # Check pause state every 0.1 seconds
- remaining_time -= 0.1
- else:
- await asyncio.sleep(0.1) # Minimal sleep while paused
- if (
- self.timers[silence_type]["is_running"]
- and not self.timers[silence_type]["is_paused"]
- ):
- await self.handle_silence(silence_type)
- except asyncio.CancelledError:
- pass
- def sample_message(self, silence_type: SilenceTypes) -> str:
- """Randomly sample a message for the given silence type."""
- messages = self.config.default_messages[silence_type]
- return random.choice(messages) if isinstance(messages, list) else messages
- async def handle_silence(self, silence_type: SilenceTypes):
- """Handle silence based on timer type"""
- if self.hangup_in_progress:
- return
- if silence_type == SilenceTypes.HANG_UP:
- self.hangup_in_progress = True
- self.stop()
- logger.info(
- f"Stopping check customer timer for lead {self.metadata['lead_id']}"
- )
- try:
- self.stop(SilenceTypes.CHECK_CUSTOMER)
- try:
- chat_ctx = self.agent.chat_ctx.copy()
- llm_message = self.config.llm_messages[silence_type]
- chat_ctx.append(role=llm_message.role, text=llm_message.message)
- logger.debug(f"LLM message: {llm_message}")
- message = self.agent.llm.chat(chat_ctx=chat_ctx)
- except Exception as e:
- message = self.sample_message(silence_type)
- logger.debug(
- f"Error generating silence message: {str(e)}\nUsing fallback message: {message}"
- )
- # await self.agent.say(message)
- speech_handle = await self.agent.say(message)
- logger.info(f"Starting {silence_type} speech...")
- await speech_handle.wait_for_initialization()
- logger.info(f"Speech handle completed for lead {self.metadata['lead_id']}")
- if silence_type == SilenceTypes.HANG_UP:
- logger.info("Entering hangup block")
- if speech_handle.interrupted:
- logger.info("Hangup interrupted...")
- self.hangup_in_progress = False
- self.start()
- return
- else:
- logger.info("Hangup completed...")
- await self.cleanup()
- # await self.agent_hang_up_callback(ctx, self.participant, self.metadata)
- # TODO: Add hangup logic here. Currently, hangup is triggered when the agent says `goodbye`
- except Exception as e:
- logger.error(
- f"Error during {silence_type} speech: {str(e)} for lead {self.metadata['lead_id']}"
- )
- finally:
- self.start(SilenceTypes.CHECK_CUSTOMER)
- def start(self, silence_type: SilenceTypes = None):
- """Start specific or all timers"""
- if silence_type:
- self._start_timer(silence_type)
- else:
- # Start both timers
- self._start_timer(SilenceTypes.CHECK_CUSTOMER)
- self._start_timer(SilenceTypes.HANG_UP)
- def _start_timer(self, silence_type: SilenceTypes):
- if self.timers[silence_type]["task"]:
- self.timers[silence_type]["task"].cancel()
- self.timers[silence_type]["is_running"] = True
- self.timers[silence_type]["is_paused"] = False
- self.timers[silence_type]["pause_time"] = None
- self.timers[silence_type]["remaining_time"] = None
- self.timers[silence_type]["task"] = asyncio.create_task(
- self._timer(silence_type)
- )
- def stop(self, silence_type: SilenceTypes = None):
- """Stop specific or all timers"""
- if silence_type:
- self._stop_timer(silence_type)
- else:
- # Stop both timers
- self._stop_timer(SilenceTypes.CHECK_CUSTOMER)
- self._stop_timer(SilenceTypes.HANG_UP)
- def _stop_timer(self, silence_type: SilenceTypes):
- self.timers[silence_type]["is_running"] = False
- self.timers[silence_type]["is_paused"] = False
- self.timers[silence_type]["pause_time"] = None
- self.timers[silence_type]["remaining_time"] = None
- if self.timers[silence_type]["task"]:
- self.timers[silence_type]["task"].cancel()
- self.timers[silence_type]["task"] = None
- def is_running(self, silence_type: SilenceTypes) -> bool:
- """Check if a specific timer is running"""
- return self.timers[silence_type]["is_running"]
- def pause(self, silence_type: SilenceTypes = None):
- """Pause specific or all timers without canceling them"""
- if silence_type:
- self._pause_timer(silence_type)
- else:
- # Pause both timers
- self._pause_timer(SilenceTypes.CHECK_CUSTOMER)
- self._pause_timer(SilenceTypes.HANG_UP)
- def _pause_timer(self, silence_type: SilenceTypes):
- timer_info = self.timers[silence_type]
- if timer_info["is_running"] and not timer_info["is_paused"]:
- timer_info["is_paused"] = True
- timer_info["pause_time"] = asyncio.get_event_loop().time()
- logger.debug(f"Paused {silence_type} timer")
- def resume(self, silence_type: SilenceTypes = None):
- """Resume specific or all paused timers"""
- if silence_type:
- self._resume_timer(silence_type)
- else:
- # Resume both timers
- self._resume_timer(SilenceTypes.CHECK_CUSTOMER)
- self._resume_timer(SilenceTypes.HANG_UP)
- def _resume_timer(self, silence_type: SilenceTypes):
- timer_info = self.timers[silence_type]
- if timer_info["task"] and timer_info["is_paused"]:
- timer_info["is_paused"] = False
- logger.debug(f"Resumed {silence_type} timer")
- def is_paused(self, silence_type: SilenceTypes) -> bool:
- """Check if a specific timer is paused"""
- return self.timers[silence_type]["is_paused"]
- def setup_event_handlers(self, agent):
- @agent.on("agent_started_speaking")
- def on_agent_started_speaking():
- logger.debug(
- "Agent started speaking. Pausing Hangup Timer & stopping Check Customer Timer"
- )
- self.stop(SilenceTypes.CHECK_CUSTOMER)
- self.pause(SilenceTypes.HANG_UP)
- @agent.on("agent_stopped_speaking")
- def on_agent_stopped_speaking():
- # Start both timers when agent stops speaking
- logger.debug("Agent stopped speaking. Starting Check Customer Timer")
- self.start(SilenceTypes.CHECK_CUSTOMER)
- # For hang_up timer, only start it if it's not already running
- if self.is_running(SilenceTypes.HANG_UP):
- logger.debug("Hangup timer already running. Resuming...")
- self.resume(SilenceTypes.HANG_UP)
- else:
- logger.debug("Starting hangup timer...")
- self.stop(SilenceTypes.HANG_UP)
- self.start(SilenceTypes.HANG_UP)
- @agent.on("user_started_speaking")
- def on_user_started_speaking():
- logger.debug("User started speaking. Stopping Both Timers")
- # Stop both timers
- self.stop()
- @agent.on("user_stopped_speaking")
- def on_user_stopped_speaking():
- logger.debug("User stopped speaking. Starting Both Timers from scratch")
- # Only start hang up timer
- self.start()
- @agent.on("agent_speech_interrupted")
- def on_agent_speech_interrupted():
- logger.debug("Agent speech interrupted. Stopping Both Timers")
- # Stop both timers
- self.stop()
- async def cleanup(self):
- """Gracefully cleanup and delete the SilenceTimer instance"""
- # Stop all running timers
- self.stop()
- # Cancel any remaining tasks
- for silence_type in self.timers:
- if self.timers[silence_type]["task"]:
- self.timers[silence_type]["task"].cancel()
- try:
- await self.timers[silence_type]["task"]
- except asyncio.CancelledError:
- pass
- # Clear all references
- self.agent = None
- self.get_ctx = None
- self.participant = None
- self.metadata = None
- self.agent_hang_up_callback = None
- self.hangup_in_progress = False
- # Reinitialize timers with infinite timeout
- self.timers = {
- SilenceTypes.CHECK_CUSTOMER: TimerState(
- timeout=float("inf")
- ),
- SilenceTypes.HANG_UP: TimerState(
- timeout=float("inf")
- ),
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement