Files
miku-discord/bot/utils/voice_manager.py

917 lines
39 KiB
Python

# voice_manager.py
"""
Voice session manager for Miku Discord bot.
Handles Discord voice channel connections, resource locking, and feature blocking during voice sessions.
During a voice session:
- GPU switches to AMD for text inference only
- Vision model is blocked (keeps GTX 1660 for TTS)
- Image generation is blocked
- Bipolar mode interactions are disabled
- Profile picture switching is locked
- Autonomous engine is paused
- Scheduled events are paused
- Text channels are paused (messages queued)
"""
import asyncio
import json
import os
from typing import Optional
import discord
from discord.ext import voice_recv
import globals
from utils.logger import get_logger
logger = get_logger('voice_manager')
class VoiceSessionManager:
"""
Singleton manager for voice chat sessions.
Ensures only one voice session active at a time and manages all resource locks.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.active_session: Optional['VoiceSession'] = None
self.session_lock = asyncio.Lock()
self._initialized = True
logger.info("VoiceSessionManager initialized")
async def start_session(self, guild_id: int, voice_channel: discord.VoiceChannel, text_channel: discord.TextChannel):
"""
Start a voice session with full resource locking.
Args:
guild_id: Discord guild ID
voice_channel: Voice channel to join
text_channel: Text channel for voice prompts
Raises:
Exception: If session already active or resources can't be locked
"""
async with self.session_lock:
if self.active_session:
raise Exception("Voice session already active")
logger.info(f"Starting voice session in {voice_channel.name} (guild {guild_id})")
try:
# 1. Switch to AMD GPU for text inference
await self._switch_to_amd_gpu()
# 2. Block vision model loading
await self._block_vision_model()
# 3. Disable image generation (ComfyUI)
await self._disable_image_generation()
# 4. Pause text channel inference (queue messages)
await self._pause_text_channels()
# 5. Disable bipolar mode interactions (Miku/Evil Miku arguments)
await self._disable_bipolar_mode()
# 6. Disable profile picture switching
await self._disable_profile_picture_switching()
# 7. Pause autonomous engine
await self._pause_autonomous_engine()
# 8. Pause scheduled events
await self._pause_scheduled_events()
# 9. Pause figurine notifier
await self._pause_figurine_notifier()
# 10. Create voice session
self.active_session = VoiceSession(guild_id, voice_channel, text_channel)
# 11. Connect to Discord voice channel with VoiceRecvClient
try:
voice_client = await voice_channel.connect(cls=voice_recv.VoiceRecvClient)
self.active_session.voice_client = voice_client
self.active_session.active = True
logger.info(f"✓ Connected to voice channel: {voice_channel.name} (with audio receiving)")
except Exception as e:
logger.error(f"Failed to connect to voice channel: {e}", exc_info=True)
raise
# 12. Start audio streaming (Phase 2)
try:
await self.active_session.start_audio_streaming()
logger.info(f"✓ Audio streaming started")
except Exception as e:
logger.error(f"Failed to start audio streaming: {e}", exc_info=True)
# Continue anyway - audio streaming is optional for Phase 2 testing
logger.info(f"✓ Voice session started successfully")
except Exception as e:
logger.error(f"Failed to start voice session: {e}", exc_info=True)
# Cleanup on failure
await self._cleanup_failed_start()
raise
async def end_session(self):
"""
End voice session and release all resources.
"""
async with self.session_lock:
if not self.active_session:
logger.warning("No active voice session to end")
return
logger.info("Ending voice session...")
try:
# 1. Stop audio streaming
if self.active_session:
try:
await self.active_session.stop_audio_streaming()
except Exception as e:
logger.error(f"Error stopping audio streaming: {e}")
# 2. Disconnect from voice channel
if self.active_session.voice_client:
try:
await self.active_session.voice_client.disconnect()
logger.info("✓ Disconnected from voice channel")
except Exception as e:
logger.error(f"Error disconnecting from voice: {e}")
# 3. Resume text channel inference
await self._resume_text_channels()
# 4. Unblock vision model
await self._unblock_vision_model()
# 5. Re-enable image generation
await self._enable_image_generation()
# 6. Re-enable bipolar mode interactions
await self._enable_bipolar_mode()
# 7. Re-enable profile picture switching
await self._enable_profile_picture_switching()
# 8. Resume autonomous engine
await self._resume_autonomous_engine()
# 9. Resume scheduled events
await self._resume_scheduled_events()
# 10. Resume figurine notifier
await self._resume_figurine_notifier()
# 10. Clear active session
self.active_session = None
logger.info("✓ Voice session ended successfully, all resources released")
except Exception as e:
logger.error(f"Error during session cleanup: {e}", exc_info=True)
# Force clear session even on error
self.active_session = None
raise
# ==================== Resource Locking Methods ====================
async def _switch_to_amd_gpu(self):
"""Switch text inference to AMD GPU (RX 6800)"""
try:
gpu_state_file = os.path.join("memory", "gpu_state.json")
os.makedirs("memory", exist_ok=True)
with open(gpu_state_file, "w") as f:
json.dump({"current_gpu": "amd", "reason": "voice_session"}, f)
logger.info("✓ Switched to AMD GPU for text inference")
except Exception as e:
logger.error(f"Failed to switch GPU: {e}")
raise
async def _block_vision_model(self):
"""Prevent vision model from loading during voice session"""
globals.VISION_MODEL_BLOCKED = True
logger.info("✓ Vision model blocked")
async def _unblock_vision_model(self):
"""Allow vision model to load after voice session"""
globals.VISION_MODEL_BLOCKED = False
logger.info("✓ Vision model unblocked")
async def _disable_image_generation(self):
"""Block ComfyUI image generation during voice session"""
globals.IMAGE_GENERATION_BLOCKED = True
globals.IMAGE_GENERATION_BLOCK_MESSAGE = (
"🎤 I can't draw right now, I'm talking in voice chat! "
"Ask me again after I leave the voice channel."
)
logger.info("✓ Image generation disabled")
async def _enable_image_generation(self):
"""Re-enable image generation after voice session"""
globals.IMAGE_GENERATION_BLOCKED = False
globals.IMAGE_GENERATION_BLOCK_MESSAGE = None
logger.info("✓ Image generation re-enabled")
async def _pause_text_channels(self):
"""Queue text messages instead of processing during voice session"""
globals.VOICE_SESSION_ACTIVE = True
globals.TEXT_MESSAGE_QUEUE = []
logger.info("✓ Text channels paused (messages will be queued)")
async def _resume_text_channels(self):
"""Process queued messages after voice session"""
globals.VOICE_SESSION_ACTIVE = False
queued_count = len(globals.TEXT_MESSAGE_QUEUE)
if queued_count > 0:
logger.info(f"Resuming text channels, {queued_count} messages queued")
# TODO: Process queue in Phase 2 (need message handler integration)
# For now, just clear the queue
globals.TEXT_MESSAGE_QUEUE = []
logger.warning(f"Discarded {queued_count} queued messages (queue processing not yet implemented)")
else:
logger.info("✓ Text channels resumed (no queued messages)")
async def _disable_bipolar_mode(self):
"""Prevent Miku/Evil Miku arguments during voice session"""
try:
from utils.bipolar_mode import pause_bipolar_interactions
pause_bipolar_interactions()
logger.info("✓ Bipolar mode interactions disabled")
except ImportError:
logger.warning("bipolar_mode module not found, skipping")
except AttributeError:
logger.warning("pause_bipolar_interactions not implemented yet, skipping")
async def _enable_bipolar_mode(self):
"""Re-enable Miku/Evil Miku arguments after voice session"""
try:
from utils.bipolar_mode import resume_bipolar_interactions
resume_bipolar_interactions()
logger.info("✓ Bipolar mode interactions re-enabled")
except ImportError:
logger.warning("bipolar_mode module not found, skipping")
except AttributeError:
logger.warning("resume_bipolar_interactions not implemented yet, skipping")
async def _disable_profile_picture_switching(self):
"""Lock profile picture during voice session"""
try:
from utils.profile_picture_manager import profile_picture_manager
if hasattr(profile_picture_manager, 'lock_switching'):
profile_picture_manager.lock_switching()
logger.info("✓ Profile picture switching disabled")
else:
logger.warning("profile_picture_manager.lock_switching not implemented yet, skipping")
except ImportError:
logger.warning("profile_picture_manager module not found, skipping")
async def _enable_profile_picture_switching(self):
"""Unlock profile picture after voice session"""
try:
from utils.profile_picture_manager import profile_picture_manager
if hasattr(profile_picture_manager, 'unlock_switching'):
profile_picture_manager.unlock_switching()
logger.info("✓ Profile picture switching re-enabled")
else:
logger.warning("profile_picture_manager.unlock_switching not implemented yet, skipping")
except ImportError:
logger.warning("profile_picture_manager module not found, skipping")
async def _pause_autonomous_engine(self):
"""Pause autonomous message generation during voice session"""
try:
from utils.autonomous import pause_autonomous_system
pause_autonomous_system()
logger.info("✓ Autonomous engine paused")
except ImportError:
logger.warning("autonomous module not found, skipping")
except AttributeError:
logger.warning("pause_autonomous_system not implemented yet, skipping")
async def _resume_autonomous_engine(self):
"""Resume autonomous message generation after voice session"""
try:
from utils.autonomous import resume_autonomous_system
resume_autonomous_system()
logger.info("✓ Autonomous engine resumed")
except ImportError:
logger.warning("autonomous module not found, skipping")
except AttributeError:
logger.warning("resume_autonomous_system not implemented yet, skipping")
async def _pause_scheduled_events(self):
"""Pause all scheduled jobs during voice session"""
try:
globals.scheduler.pause()
logger.info("✓ Scheduled events paused")
except Exception as e:
logger.error(f"Failed to pause scheduler: {e}")
async def _resume_scheduled_events(self):
"""Resume scheduled jobs after voice session"""
try:
globals.scheduler.resume()
logger.info("✓ Scheduled events resumed")
except Exception as e:
logger.error(f"Failed to resume scheduler: {e}")
async def _pause_figurine_notifier(self):
"""Pause figurine notifications during voice session"""
try:
# Assuming figurine notifier is a scheduled job
globals.scheduler.pause_job('figurine_notifier')
logger.info("✓ Figurine notifier paused")
except Exception as e:
# Job might not exist, that's okay
logger.debug(f"Could not pause figurine notifier (may not exist): {e}")
async def _resume_figurine_notifier(self):
"""Resume figurine notifications after voice session"""
try:
globals.scheduler.resume_job('figurine_notifier')
logger.info("✓ Figurine notifier resumed")
except Exception as e:
# Job might not exist, that's okay
logger.debug(f"Could not resume figurine notifier (may not exist): {e}")
async def _cleanup_failed_start(self):
"""Cleanup resources if session start fails"""
logger.warning("Cleaning up after failed session start...")
try:
# Disconnect from voice if connected
if self.active_session and self.active_session.voice_client:
try:
await self.active_session.voice_client.disconnect()
except:
pass
await self._unblock_vision_model()
await self._enable_image_generation()
await self._resume_text_channels()
await self._enable_bipolar_mode()
await self._enable_profile_picture_switching()
await self._resume_autonomous_engine()
await self._resume_scheduled_events()
await self._resume_figurine_notifier()
# Clear the session
self.active_session = None
except Exception as e:
logger.error(f"Error during cleanup: {e}")
class VoiceSession:
"""
Represents an active voice chat session with audio streaming.
"""
def __init__(self, guild_id: int, voice_channel: discord.VoiceChannel, text_channel: discord.TextChannel):
self.guild_id = guild_id
self.voice_channel = voice_channel
self.text_channel = text_channel
self.voice_client: Optional[discord.VoiceClient] = None
self.audio_source: Optional['MikuVoiceSource'] = None # Forward reference
self.tts_streamer: Optional['TTSTokenStreamer'] = None # Forward reference
self.voice_receiver: Optional['VoiceReceiver'] = None # STT receiver
self.active = False
self.miku_speaking = False # Track if Miku is currently speaking
self.llm_stream_task: Optional[asyncio.Task] = None # Track LLM streaming task for cancellation
self.last_interruption_time: float = 0 # Track when last interruption occurred
self.interruption_silence_duration = 0.8 # Seconds of silence after interruption before next response
# Voice chat conversation history (last 8 exchanges)
self.conversation_history = [] # List of {"role": "user"/"assistant", "content": str}
# Voice call management (for automated calls from web UI)
self.call_user_id: Optional[int] = None # User ID that was called
self.call_timeout_task: Optional[asyncio.Task] = None # 30min timeout task
self.user_has_joined = False # Track if user joined the call
self.auto_leave_task: Optional[asyncio.Task] = None # 45s auto-leave task
self.user_leave_time: Optional[float] = None # When user left the channel
logger.info(f"VoiceSession created for {voice_channel.name} in guild {guild_id}")
async def start_audio_streaming(self):
"""
Start audio streaming from TTS WebSocket to Discord voice.
This should be called after voice_client is connected.
"""
from utils.voice_audio import MikuVoiceSource
try:
# Create and connect audio source (handles both sending tokens and receiving audio)
self.audio_source = MikuVoiceSource()
await self.audio_source.connect()
# The audio_source now serves as both the audio source AND the token sender
# Set tts_streamer to point to audio_source for backwards compatibility
self.tts_streamer = self.audio_source
# Start playing audio to Discord
if self.voice_client and not self.voice_client.is_playing():
self.voice_client.play(self.audio_source)
logger.info("✓ Started audio streaming to Discord")
except Exception as e:
logger.error(f"Failed to start audio streaming: {e}", exc_info=True)
raise
async def stop_audio_streaming(self):
"""Stop audio streaming and cleanup resources."""
try:
# Stop Discord audio playback
if self.voice_client and self.voice_client.is_playing():
self.voice_client.stop()
# Disconnect audio source (which also handles token streaming)
if self.audio_source:
await self.audio_source.disconnect()
self.audio_source = None
self.tts_streamer = None # Clear reference since it pointed to audio_source
logger.info("✓ Stopped audio streaming")
except Exception as e:
logger.error(f"Error stopping audio streaming: {e}", exc_info=True)
async def start_listening(self, user: discord.User):
"""
Start listening to a user's voice (STT).
Args:
user: Discord user to listen to
"""
from utils.voice_receiver import VoiceReceiverSink
try:
# Create receiver if not exists
if not self.voice_receiver:
self.voice_receiver = VoiceReceiverSink(self)
# Start receiving audio from Discord using discord-ext-voice-recv
if self.voice_client:
self.voice_client.listen(self.voice_receiver)
logger.info("✓ Discord voice receive started (discord-ext-voice-recv)")
# Start listening to specific user
await self.voice_receiver.start_listening(user.id, user)
logger.info(f"✓ Started listening to {user.name}")
except Exception as e:
logger.error(f"Failed to start listening to {user.name}: {e}", exc_info=True)
raise
async def stop_listening(self, user_id: int):
"""
Stop listening to a user.
Args:
user_id: Discord user ID
"""
if self.voice_receiver:
await self.voice_receiver.stop_listening(user_id)
logger.info(f"✓ Stopped listening to user {user_id}")
async def stop_all_listening(self):
"""Stop listening to all users."""
if self.voice_receiver:
await self.voice_receiver.stop_all()
self.voice_receiver = None
logger.info("✓ Stopped all listening")
async def on_user_join(self, user_id: int):
"""Called when a user joins the voice channel."""
# If this is a voice call and the expected user joined
if self.call_user_id and user_id == self.call_user_id:
self.user_has_joined = True
logger.info(f"✓ Call user {user_id} joined the channel")
# Cancel timeout task since user joined
if self.call_timeout_task:
self.call_timeout_task.cancel()
self.call_timeout_task = None
# Cancel auto-leave task if it was running
if self.auto_leave_task:
self.auto_leave_task.cancel()
self.auto_leave_task = None
self.user_leave_time = None
async def on_user_leave(self, user_id: int):
"""Called when a user leaves the voice channel."""
# If this is the call user leaving
if self.call_user_id and user_id == self.call_user_id and self.user_has_joined:
import time
self.user_leave_time = time.time()
logger.info(f"📴 Call user {user_id} left - starting 45s auto-leave timer")
# Start 45s auto-leave timer
self.auto_leave_task = asyncio.create_task(self._auto_leave_after_user_disconnect())
async def _auto_leave_after_user_disconnect(self):
"""Auto-leave 45s after user disconnects."""
try:
await asyncio.sleep(45)
logger.info("⏰ 45s timeout reached - auto-leaving voice channel")
# End the session (will trigger cleanup)
from utils.voice_manager import VoiceSessionManager
session_manager = VoiceSessionManager()
await session_manager.end_session()
# Stop containers
from utils.container_manager import ContainerManager
await ContainerManager.stop_voice_containers()
logger.info("✓ Auto-leave complete")
except asyncio.CancelledError:
# User rejoined, normal operation
logger.info("Auto-leave cancelled - user rejoined")
async def on_user_vad_event(self, user_id: int, event: dict):
"""Called when VAD detects speech state change."""
event_type = event.get('event')
logger.debug(f"User {user_id} VAD: {event_type}")
async def on_partial_transcript(self, user_id: int, text: str):
"""Called when partial transcript is received."""
logger.info(f"Partial from user {user_id}: {text}")
# Could show "User is saying..." in chat
async def on_final_transcript(self, user_id: int, text: str):
"""
Called when final transcript is received.
This triggers LLM response and TTS.
Note: If user interrupted Miku, miku_speaking will already be False
by the time this is called, so the response will proceed normally.
"""
logger.info(f"📝 Final transcript from user {user_id}: {text}")
# Check if Miku is STILL speaking (not interrupted)
# This prevents queueing if user speaks briefly but not long enough to interrupt
if self.miku_speaking:
logger.info(f"⏭️ Ignoring short input while Miku is speaking (user didn't interrupt long enough)")
# Get user info for notification
user = self.voice_channel.guild.get_member(user_id)
user_name = user.name if user else f"User {user_id}"
# Only send message if debug mode is on
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"💬 *{user_name} said: \"{text}\" (interrupted but too brief - talk longer to interrupt)*")
return
logger.info(f"✓ Processing final transcript (miku_speaking={self.miku_speaking})")
# Get user info
user = self.voice_channel.guild.get_member(user_id)
if not user:
logger.warning(f"User {user_id} not found in guild")
return
# Check for stop commands (don't generate response if user wants silence)
stop_phrases = ["stop talking", "be quiet", "shut up", "stop speaking", "silence"]
if any(phrase in text.lower() for phrase in stop_phrases):
logger.info(f"🤫 Stop command detected: {text}")
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"🎤 {user.name}: *\"{text}\"*")
await self.text_channel.send(f"🤫 *Miku goes quiet*")
return
# Show what user said (only in debug mode)
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"🎤 {user.name}: *\"{text}\"*")
# Generate LLM response and speak it
await self._generate_voice_response(user, text)
async def on_user_interruption(self, user_id: int):
"""
Called when user interrupts Miku's speech.
This is triggered when user speaks over Miku for long enough (0.8s+ with 8+ chunks).
Immediately cancels LLM streaming, TTS synthesis, and clears audio buffers.
Args:
user_id: Discord user ID who interrupted
"""
if not self.miku_speaking:
return
logger.info(f"🛑 User {user_id} interrupted Miku - canceling everything immediately")
# Get user info
user = self.voice_channel.guild.get_member(user_id)
user_name = user.name if user else f"User {user_id}"
# 1. Mark that Miku is no longer speaking (stops LLM streaming loop check)
self.miku_speaking = False
# 2. Cancel LLM streaming task if it's running
if self.llm_stream_task and not self.llm_stream_task.done():
self.llm_stream_task.cancel()
try:
await self.llm_stream_task
except asyncio.CancelledError:
logger.info("✓ LLM streaming task cancelled")
except Exception as e:
logger.error(f"Error cancelling LLM task: {e}")
# 3. Cancel TTS/RVC synthesis and playback
await self._cancel_tts()
# 4. Add a brief pause to create audible separation
# This gives a fade-out effect and makes the interruption less jarring
import time
self.last_interruption_time = time.time()
logger.info(f"⏸️ Pausing for {self.interruption_silence_duration}s after interruption")
await asyncio.sleep(self.interruption_silence_duration)
# Add interruption marker to conversation history
self.conversation_history.append({
"role": "assistant",
"content": "[INTERRUPTED - user started speaking]"
})
# Show interruption in chat (only in debug mode)
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"⚠️ *{user_name} interrupted Miku*")
logger.info(f"✓ Interruption handled, ready for next input")
async def on_user_interruption_old(self, user_id: int, probability: float):
"""
Legacy interruption handler (kept for compatibility).
Called when VAD-based interruption detection is used.
"""
await self.on_user_interruption(user_id)
# Only show interruption message in debug mode
if globals.VOICE_DEBUG_MODE:
user = self.voice_channel.guild.get_member(user_id)
await self.text_channel.send(f"⚠️ *{user.name if user else 'User'} interrupted Miku*")
async def _generate_voice_response(self, user: discord.User, text: str):
"""
Generate LLM response and speak it.
Args:
user: User who spoke
text: Transcribed text
"""
try:
# Check if we need to wait due to recent interruption
import time
if self.last_interruption_time > 0:
time_since_interruption = time.time() - self.last_interruption_time
remaining_pause = self.interruption_silence_duration - time_since_interruption
if remaining_pause > 0:
logger.info(f"⏸️ Waiting {remaining_pause:.2f}s more before responding (interruption cooldown)")
await asyncio.sleep(remaining_pause)
logger.info(f"🎙️ Starting voice response generation (setting miku_speaking=True)")
self.miku_speaking = True
logger.info(f" → miku_speaking is now: {self.miku_speaking}")
# Show processing (only in debug mode)
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"💭 *Miku is thinking...*")
# Import here to avoid circular imports
from utils.llm import get_current_gpu_url
import aiohttp
# Load personality and lore
miku_lore = ""
miku_prompt = ""
try:
with open('/app/miku_lore.txt', 'r', encoding='utf-8') as f:
miku_lore = f.read().strip()
with open('/app/miku_prompt.txt', 'r', encoding='utf-8') as f:
miku_prompt = f.read().strip()
except Exception as e:
logger.warning(f"Could not load personality files: {e}")
# Build voice chat system prompt
system_prompt = f"""{miku_prompt}
{miku_lore}
VOICE CHAT CONTEXT:
- You are currently in a voice channel speaking with {user.name} and others
- Your responses will be spoken aloud via text-to-speech
- Keep responses natural and conversational - vary your length based on context:
* Quick reactions: 1 sentence ("Oh wow!" or "That's amazing!")
* Normal chat: 2-3 sentences (share a thought or feeling)
* Stories/explanations: 4-6 sentences when asked for details
- Match the user's energy and conversation style
- IMPORTANT: Only respond in ENGLISH! The TTS system cannot handle Japanese or other languages well.
- IMPORTANT: Do not include emojis in your response! The TTS system cannot handle them well.
- IMPORTANT: Do NOT prefix your response with your name (like "Miku:" or "Hatsune Miku:")! Just speak naturally - you're already known to be speaking.
- Be expressive and use casual language, but stay in character as Miku
- If user says "stop talking" or "be quiet", acknowledge briefly and stop
- NOTE: You will automatically disconnect 45 seconds after {user.name} leaves the voice channel, so you can mention this if asked about leaving
Remember: This is a live voice conversation - be natural, not formulaic!"""
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": f"{user.name}: {text}"
})
# Keep only last 8 exchanges (16 messages = 8 user + 8 assistant)
if len(self.conversation_history) > 16:
self.conversation_history = self.conversation_history[-16:]
# Build messages for LLM
messages = [{"role": "system", "content": system_prompt}]
messages.extend(self.conversation_history)
payload = {
"model": globals.TEXT_MODEL,
"messages": messages,
"stream": True,
"temperature": 0.8,
"max_tokens": 200
}
headers = {'Content-Type': 'application/json'}
llama_url = get_current_gpu_url()
# Create streaming task so we can cancel it if interrupted
async def stream_llm_to_tts():
"""Stream LLM tokens to TTS. Can be cancelled."""
full_response = ""
async with aiohttp.ClientSession() as http_session:
async with http_session.post(
f"{llama_url}/v1/chat/completions",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"LLM error {response.status}: {error_text}")
# Stream tokens to TTS
async for line in response.content:
if not self.miku_speaking:
# Interrupted - exit gracefully
logger.info("🛑 LLM streaming stopped (miku_speaking=False)")
break
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data_str = line[6:]
if data_str == '[DONE]':
break
try:
import json
data = json.loads(data_str)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
await self.audio_source.send_token(content)
full_response += content
except json.JSONDecodeError:
continue
return full_response
# Run streaming as a task that can be cancelled
self.llm_stream_task = asyncio.create_task(stream_llm_to_tts())
try:
full_response = await self.llm_stream_task
except asyncio.CancelledError:
logger.info("✓ LLM streaming cancelled by interruption")
# Don't re-raise - just return early to avoid breaking STT client
return
# Flush TTS
if self.miku_speaking:
await self.audio_source.flush()
# Filter out self-referential prefixes from response
filtered_response = self._filter_name_prefixes(full_response.strip())
# Add Miku's complete response to history (use filtered version)
self.conversation_history.append({
"role": "assistant",
"content": filtered_response
})
# Show response (only in debug mode)
if globals.VOICE_DEBUG_MODE:
await self.text_channel.send(f"🎤 Miku: *\"{filtered_response}\"*")
logger.info(f"✓ Voice response complete: {filtered_response}")
else:
# Interrupted - don't add incomplete response to history
# (interruption marker already added by on_user_interruption)
logger.info(f"✓ Response interrupted after {len(full_response)} chars")
except Exception as e:
logger.error(f"Voice response failed: {e}", exc_info=True)
await self.text_channel.send(f"❌ Sorry, I had trouble responding")
finally:
self.miku_speaking = False
def _filter_name_prefixes(self, text: str) -> str:
"""
Filter out self-referential name prefixes from Miku's responses.
Removes patterns like:
- "Miku: rest of text"
- "Hatsune Miku: rest of text"
- "miku: rest of text" (case insensitive)
Args:
text: Raw response text
Returns:
Filtered text without name prefixes
"""
import re
# Pattern matches "Miku:" or "Hatsune Miku:" at the start of the text (case insensitive)
# Captures any amount of whitespace after the colon
pattern = r'^(?:Hatsune\s+)?Miku:\s*'
filtered = re.sub(pattern, '', text, flags=re.IGNORECASE)
# Log if we filtered something
if filtered != text:
logger.info(f"Filtered name prefix: '{text[:30]}...' -> '{filtered[:30]}...'")
return filtered
async def _cancel_tts(self):
"""
Immediately cancel TTS synthesis and clear all audio buffers.
This sends interrupt signals to:
1. Local audio buffer (clears queued audio)
2. RVC TTS server (stops synthesis pipeline)
Does NOT stop voice_client (that would disconnect voice receiver).
"""
logger.info("🛑 Canceling TTS synthesis immediately")
# 1. FIRST: Clear local audio buffer to stop playing queued audio
if self.audio_source:
try:
await self.audio_source.clear_buffer()
logger.info("✓ Audio buffer cleared")
except Exception as e:
logger.error(f"Failed to clear audio buffer: {e}")
# 2. SECOND: Send interrupt to RVC to stop synthesis pipeline
try:
import aiohttp
async with aiohttp.ClientSession() as session:
# Send interrupt multiple times rapidly to ensure it's received
for i in range(3):
try:
async with session.post(
"http://172.25.0.1:8765/interrupt",
timeout=aiohttp.ClientTimeout(total=2.0)
) as resp:
if resp.status == 200:
data = await resp.json()
logger.info(f"✓ TTS interrupted (flushed {data.get('zmq_chunks_flushed', 0)} chunks)")
break
except asyncio.TimeoutError:
if i < 2: # Don't warn on last attempt
logger.warning("Interrupt request timed out, retrying...")
continue
except Exception as e:
logger.error(f"Failed to interrupt TTS: {e}")
# Note: We do NOT call voice_client.stop() because that would
# stop the entire voice system including the receiver!
# The audio source will just play silence until new tokens arrive.
# Global singleton instance
voice_manager = VoiceSessionManager()