refactor: split api.py monolith into 19 route modules (Phase B)

Split 3,598-line api.py into thin orchestrator (128 lines) + 19 route
modules in bot/routes/:

  core.py (7 routes), mood.py (10), language.py (3), evil_mode.py (6),
  bipolar_mode.py (9), gpu.py (2), bot_actions.py (4), autonomous.py (13),
  profile_picture.py (26), manual_send.py (3), servers.py (6),
  figurines.py (5), dms.py (18), image_generation.py (4), chat.py (1),
  config.py (7), logging_config.py (9), voice.py (3), memory.py (10)

All 146 routes verified present via test_route_split.py (149 tests).
21/21 regression tests (test_config_state.py) pass.
Monolith backup: bot/api_monolith_backup.py (revert: cp it to api.py).
This commit is contained in:
2026-04-15 11:38:14 +03:00
parent 8b14160028
commit 979217e7cc
26 changed files with 7624 additions and 3541 deletions

292
bot/routes/bipolar_mode.py Normal file
View File

@@ -0,0 +1,292 @@
"""Bipolar mode routes."""
import asyncio
from fastapi import APIRouter
import globals
from routes.models import BipolarTriggerRequest
from utils.logger import get_logger
logger = get_logger('api')
router = APIRouter()
@router.get("/bipolar-mode")
def get_bipolar_mode_status():
"""Get current bipolar mode status"""
from utils.bipolar_mode import is_bipolar_mode, is_argument_in_progress
# Get any active arguments
active_arguments = {}
for channel_id, data in globals.BIPOLAR_ARGUMENT_IN_PROGRESS.items():
if data.get("active"):
active_arguments[channel_id] = data
return {
"bipolar_mode": is_bipolar_mode(),
"evil_mode": globals.EVIL_MODE,
"active_arguments": active_arguments,
"webhooks_configured": len(globals.BIPOLAR_WEBHOOKS)
}
@router.post("/bipolar-mode/enable")
def enable_bipolar_mode():
"""Enable bipolar mode"""
from utils.bipolar_mode import enable_bipolar_mode as _enable
if globals.BIPOLAR_MODE:
return {"status": "ok", "message": "Bipolar mode is already enabled", "bipolar_mode": True}
_enable()
# Persist to config manager
try:
from config_manager import config_manager
config_manager.set("runtime.bipolar_mode.enabled", True, persist=True)
except Exception as e:
logger.warning(f"Failed to persist bipolar mode enable to config: {e}")
return {"status": "ok", "message": "Bipolar mode enabled", "bipolar_mode": True}
@router.post("/bipolar-mode/disable")
def disable_bipolar_mode():
"""Disable bipolar mode"""
from utils.bipolar_mode import disable_bipolar_mode as _disable, cleanup_webhooks
if not globals.BIPOLAR_MODE:
return {"status": "ok", "message": "Bipolar mode is already disabled", "bipolar_mode": False}
_disable()
# Persist to config manager
try:
from config_manager import config_manager
config_manager.set("runtime.bipolar_mode.enabled", False, persist=True)
except Exception as e:
logger.warning(f"Failed to persist bipolar mode disable to config: {e}")
# Optionally cleanup webhooks in background
if globals.client and globals.client.loop and globals.client.loop.is_running():
globals.client.loop.create_task(cleanup_webhooks(globals.client))
return {"status": "ok", "message": "Bipolar mode disabled", "bipolar_mode": False}
@router.post("/bipolar-mode/toggle")
def toggle_bipolar_mode():
"""Toggle bipolar mode on/off"""
from utils.bipolar_mode import toggle_bipolar_mode as _toggle, cleanup_webhooks
new_state = _toggle()
# If disabled, cleanup webhooks
if not new_state:
if globals.client and globals.client.loop and globals.client.loop.is_running():
globals.client.loop.create_task(cleanup_webhooks(globals.client))
return {
"status": "ok",
"message": f"Bipolar mode {'enabled' if new_state else 'disabled'}",
"bipolar_mode": new_state
}
@router.post("/bipolar-mode/trigger-argument")
def trigger_argument(data: BipolarTriggerRequest):
"""Manually trigger an argument in a specific channel
If message_id is provided, the argument will start from that message.
The opposite persona will respond to it.
"""
from utils.bipolar_mode import force_trigger_argument, force_trigger_argument_from_message_id, is_bipolar_mode, is_argument_in_progress
# Parse IDs from strings
try:
channel_id = int(data.channel_id)
except ValueError:
return {"status": "error", "message": "Invalid channel ID format"}
message_id = None
if data.message_id:
try:
message_id = int(data.message_id)
except ValueError:
return {"status": "error", "message": "Invalid message ID format"}
if not is_bipolar_mode():
return {"status": "error", "message": "Bipolar mode is not enabled"}
if is_argument_in_progress(channel_id):
return {"status": "error", "message": "An argument is already in progress in this channel"}
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Discord client not ready"}
# If message_id is provided, use the message-based trigger
if message_id:
async def trigger_from_message():
success, error = await force_trigger_argument_from_message_id(
channel_id, message_id, globals.client, data.context
)
if not success:
logger.error(f"Failed to trigger argument from message: {error}")
globals.client.loop.create_task(trigger_from_message())
return {
"status": "ok",
"message": f"Argument triggered from message {message_id}",
"channel_id": channel_id,
"message_id": message_id
}
# Otherwise, find the channel and trigger normally
channel = globals.client.get_channel(channel_id)
if not channel:
return {"status": "error", "message": f"Channel {channel_id} not found"}
# Trigger the argument
globals.client.loop.create_task(force_trigger_argument(channel, globals.client, data.context))
return {
"status": "ok",
"message": f"Argument triggered in #{channel.name}",
"channel_id": channel_id
}
@router.post("/bipolar-mode/trigger-dialogue")
def trigger_dialogue(data: dict):
"""Manually trigger a persona dialogue from a message
Forces the opposite persona to start a dialogue (bypasses the interjection check).
"""
from utils.persona_dialogue import get_dialogue_manager
from utils.bipolar_mode import is_bipolar_mode, is_argument_in_progress
message_id_str = data.get("message_id")
if not message_id_str:
return {"status": "error", "message": "Message ID is required"}
# Parse message ID
try:
message_id = int(message_id_str)
except ValueError:
return {"status": "error", "message": "Invalid message ID format"}
if not is_bipolar_mode():
return {"status": "error", "message": "Bipolar mode is not enabled"}
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Discord client not ready"}
async def trigger_dialogue_task():
try:
# Fetch the message
message = None
for channel in globals.client.get_all_channels():
if hasattr(channel, 'fetch_message'):
try:
message = await channel.fetch_message(message_id)
break
except:
continue
if not message:
logger.error(f"Message {message_id} not found")
return
# Check if there's already an argument or dialogue in progress
dialogue_manager = get_dialogue_manager()
if dialogue_manager.is_dialogue_active(message.channel.id):
logger.error(f"Dialogue already active in channel {message.channel.id}")
return
if is_argument_in_progress(message.channel.id):
logger.error(f"Argument already in progress in channel {message.channel.id}")
return
# Determine current persona from the message author
if message.webhook_id:
# It's a webhook message, need to determine which persona
current_persona = "evil" if globals.EVIL_MODE else "miku"
elif message.author.id == globals.client.user.id:
# It's the bot's message
current_persona = "evil" if globals.EVIL_MODE else "miku"
else:
# User message - can't trigger dialogue from user messages
logger.error(f"Cannot trigger dialogue from user message")
return
opposite_persona = "evil" if current_persona == "miku" else "miku"
logger.info(f"[Manual Trigger] Forcing {opposite_persona} to start dialogue on message {message_id}")
# Force start the dialogue (bypass interjection check)
dialogue_manager.start_dialogue(message.channel.id)
asyncio.create_task(
dialogue_manager.handle_dialogue_turn(
message.channel,
opposite_persona,
trigger_reason="manual_trigger"
)
)
except Exception as e:
logger.error(f"Error triggering dialogue: {e}")
import traceback
traceback.print_exc()
globals.client.loop.create_task(trigger_dialogue_task())
return {
"status": "ok",
"message": f"Dialogue triggered for message {message_id}"
}
@router.get("/bipolar-mode/scoreboard")
def get_bipolar_scoreboard():
"""Get the bipolar mode argument scoreboard"""
from utils.bipolar_mode import load_scoreboard, get_scoreboard_summary
scoreboard = load_scoreboard()
return {
"status": "ok",
"scoreboard": {
"miku_wins": scoreboard.get("miku", 0),
"evil_wins": scoreboard.get("evil", 0),
"total_arguments": scoreboard.get("miku", 0) + scoreboard.get("evil", 0),
"history": scoreboard.get("history", [])[-10:] # Last 10 results
},
"summary": get_scoreboard_summary()
}
@router.post("/bipolar-mode/cleanup-webhooks")
def cleanup_bipolar_webhooks():
"""Cleanup all bipolar webhooks from all servers"""
from utils.bipolar_mode import cleanup_webhooks
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Discord client not ready"}
globals.client.loop.create_task(cleanup_webhooks(globals.client))
return {"status": "ok", "message": "Webhook cleanup started"}
@router.get("/bipolar-mode/arguments")
def get_active_arguments():
"""Get all active arguments"""
active = {}
for channel_id, data in globals.BIPOLAR_ARGUMENT_IN_PROGRESS.items():
if data.get("active"):
channel = globals.client.get_channel(channel_id) if globals.client else None
active[channel_id] = {
**data,
"channel_name": channel.name if channel else "Unknown"
}
return {"active_arguments": active}