Files
miku-discord/bot/routes/dms.py
koko210Serve 979217e7cc refactor: split api.py monolith into 19 route modules (Phase B)
Split 3,598-line api.py into thin orchestrator (128 lines) + 19 route
modules in bot/routes/:

  core.py (7 routes), mood.py (10), language.py (3), evil_mode.py (6),
  bipolar_mode.py (9), gpu.py (2), bot_actions.py (4), autonomous.py (13),
  profile_picture.py (26), manual_send.py (3), servers.py (6),
  figurines.py (5), dms.py (18), image_generation.py (4), chat.py (1),
  config.py (7), logging_config.py (9), voice.py (3), memory.py (10)

All 146 routes verified present via test_route_split.py (149 tests).
21/21 regression tests (test_config_state.py) pass.
Monolith backup: bot/api_monolith_backup.py (revert: cp it to api.py).
2026-04-15 11:38:14 +03:00

468 lines
19 KiB
Python

"""DM routes: custom prompt DMs, manual DMs, logging, blocking, analysis."""
import io
import os
import json
from typing import List
from fastapi import APIRouter, UploadFile, File, Form
import discord
import globals
from routes.models import CustomPromptRequest
from utils.dm_logger import dm_logger
from utils.logger import get_logger
logger = get_logger('api')
router = APIRouter()
# ========== DM Custom / Manual Send ==========
@router.post("/dm/{user_id}/custom")
async def send_custom_prompt_dm(user_id: str, req: CustomPromptRequest):
"""Send custom prompt via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return {"status": "error", "message": f"User {user_id} not found"}
# Use the LLM query function for DM context
from utils.llm import query_llama
async def send_dm_custom_prompt():
try:
response = await query_llama(req.prompt, user_id=user_id, guild_id=None, response_type="dm_response")
await user.send(response)
logger.info(f"Custom DM prompt sent to user {user_id}: {req.prompt[:50]}...")
# Log to DM history
dm_logger.log_conversation(user_id, req.prompt, response)
except Exception as e:
logger.error(f"Failed to send custom DM prompt to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_custom_prompt())
return {"status": "ok", "message": f"Custom DM prompt queued for user {user_id}"}
except ValueError:
return {"status": "error", "message": "Invalid user ID format"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@router.post("/dm/{user_id}/manual")
async def send_manual_message_dm(
user_id: str,
message: str = Form(...),
files: List[UploadFile] = File(default=[]),
reply_to_message_id: str = Form(None),
mention_author: bool = Form(True)
):
"""Send manual message via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return {"status": "error", "message": f"User {user_id} not found"}
# Read file content immediately before the request closes
file_data = []
for file in files:
try:
file_content = await file.read()
file_data.append({
'filename': file.filename,
'content': file_content
})
except Exception as e:
logger.error(f"Failed to read file {file.filename}: {e}")
return {"status": "error", "message": f"Failed to read file {file.filename}: {e}"}
async def send_dm_message_and_files():
try:
# Get the reference message if replying (must be done inside the task)
reference_message = None
if reply_to_message_id:
try:
dm_channel = user.dm_channel or await user.create_dm()
reference_message = await dm_channel.fetch_message(int(reply_to_message_id))
except Exception as e:
logger.error(f"Could not fetch DM message {reply_to_message_id} for reply: {e}")
return
# Send the main message
if message.strip():
if reference_message:
await user.send(message, reference=reference_message, mention_author=mention_author)
logger.info(f"Manual DM reply message sent to user {user_id}")
else:
await user.send(message)
logger.info(f"Manual DM message sent to user {user_id}")
# Send files if any
for file_info in file_data:
try:
await user.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename']))
logger.info(f"File {file_info['filename']} sent via DM to user {user_id}")
except Exception as e:
logger.error(f"Failed to send file {file_info['filename']} via DM: {e}")
# Log to DM history (user message = manual override trigger, miku response = the message sent)
dm_logger.log_conversation(user_id, "[Manual Override Trigger]", message, attachments=[f['filename'] for f in file_data])
except Exception as e:
logger.error(f"Failed to send manual DM to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_message_and_files())
return {"status": "ok", "message": f"Manual DM message queued for user {user_id}"}
except ValueError:
return {"status": "error", "message": "Invalid user ID format"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
# ========== DM Logging Endpoints ==========
@router.get("/dms/users")
def get_dm_users():
"""Get summary of all users who have DMed the bot"""
try:
users = dm_logger.get_all_dm_users()
return {"status": "ok", "users": users}
except Exception as e:
return {"status": "error", "message": f"Failed to get DM users: {e}"}
@router.get("/dms/users/{user_id}")
def get_dm_user_conversation(user_id: str):
"""Get conversation summary for a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
summary = dm_logger.get_user_conversation_summary(user_id_int)
return {"status": "ok", "summary": summary}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to get user conversation: {e}"}
@router.get("/dms/users/{user_id}/conversations")
def get_dm_conversations(user_id: str, limit: int = 50):
"""Get recent conversations with a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
logger.debug(f"Loading conversations for user {user_id_int}, limit: {limit}")
logs = dm_logger._load_user_logs(user_id_int)
logger.debug(f"Loaded logs for user {user_id_int}: {len(logs.get('conversations', []))} conversations")
conversations = logs["conversations"][-limit:] if limit > 0 else logs["conversations"]
# Convert message IDs to strings to prevent JavaScript precision loss
for conv in conversations:
if "message_id" in conv:
conv["message_id"] = str(conv["message_id"])
logger.debug(f"Returning {len(conversations)} conversations")
# Debug: Show message IDs being returned
for i, conv in enumerate(conversations):
msg_id = conv.get("message_id", "")
is_bot = conv.get("is_bot_message", False)
content_preview = conv.get("content", "")[:30] + "..." if conv.get("content", "") else "[No content]"
logger.debug(f"Conv {i}: id={msg_id} (type: {type(msg_id)}), is_bot={is_bot}, content='{content_preview}'")
return {"status": "ok", "conversations": conversations}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to get conversations for user {user_id}: {e}")
return {"status": "error", "message": f"Failed to get conversations: {e}"}
@router.get("/dms/users/{user_id}/search")
def search_dm_conversations(user_id: str, query: str, limit: int = 10):
"""Search conversations with a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
results = dm_logger.search_user_conversations(user_id_int, query, limit)
return {"status": "ok", "results": results}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to search conversations: {e}"}
@router.get("/dms/users/{user_id}/export")
def export_dm_conversation(user_id: str, format: str = "json"):
"""Export all conversations with a user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
export_path = dm_logger.export_user_conversation(user_id_int, format)
return {"status": "ok", "export_path": export_path, "format": format}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to export conversation: {e}"}
@router.delete("/dms/users/{user_id}")
def delete_dm_user_logs(user_id: str):
"""Delete all DM logs for a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
log_file = dm_logger._get_user_log_file(user_id_int)
if os.path.exists(log_file):
os.remove(log_file)
return {"status": "ok", "message": f"Deleted DM logs for user {user_id}"}
else:
return {"status": "error", "message": f"No DM logs found for user {user_id}"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to delete DM logs: {e}"}
# ========== User Blocking & DM Management ==========
@router.get("/dms/blocked-users")
def get_blocked_users():
"""Get list of all blocked users"""
try:
blocked_users = dm_logger.get_blocked_users()
return {"status": "ok", "blocked_users": blocked_users}
except Exception as e:
logger.error(f"Failed to get blocked users: {e}")
return {"status": "error", "message": f"Failed to get blocked users: {e}"}
@router.post("/dms/users/{user_id}/block")
def block_user(user_id: str):
"""Block a user from sending DMs to Miku"""
try:
user_id_int = int(user_id)
# Get username from DM logs if available
user_summary = dm_logger.get_user_conversation_summary(user_id_int)
username = user_summary.get("username", "Unknown")
success = dm_logger.block_user(user_id_int, username)
if success:
logger.info(f"User {user_id} ({username}) blocked")
return {"status": "ok", "message": f"User {username} has been blocked"}
else:
return {"status": "error", "message": f"User {username} is already blocked"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to block user {user_id}: {e}")
return {"status": "error", "message": f"Failed to block user: {e}"}
@router.post("/dms/users/{user_id}/unblock")
def unblock_user(user_id: str):
"""Unblock a user"""
try:
user_id_int = int(user_id)
success = dm_logger.unblock_user(user_id_int)
if success:
logger.info(f"User {user_id} unblocked")
return {"status": "ok", "message": f"User has been unblocked"}
else:
return {"status": "error", "message": f"User is not blocked"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to unblock user {user_id}: {e}")
return {"status": "error", "message": f"Failed to unblock user: {e}"}
@router.post("/dms/users/{user_id}/conversations/{conversation_id}/delete")
def delete_conversation(user_id: str, conversation_id: str):
"""Delete a specific conversation/message from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async deletion in the bot's event loop
async def do_delete():
return await dm_logger.delete_conversation(user_id_int, conversation_id)
globals.client.loop.create_task(do_delete())
# For now, return success immediately since we can't await in FastAPI sync endpoint
# The actual deletion happens asynchronously
logger.info(f"Queued deletion of conversation {conversation_id} for user {user_id}")
return {"status": "ok", "message": "Message deletion queued (will delete from both Discord and logs)"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to queue conversation deletion {conversation_id}: {e}")
return {"status": "error", "message": f"Failed to delete conversation: {e}"}
@router.post("/dms/users/{user_id}/conversations/delete-all")
def delete_all_conversations(user_id: str):
"""Delete all conversations with a user from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async bulk deletion in the bot's event loop
async def do_delete_all():
return await dm_logger.delete_all_conversations(user_id_int)
globals.client.loop.create_task(do_delete_all())
# Return success immediately since we can't await in FastAPI sync endpoint
logger.info(f"Queued bulk deletion of all conversations for user {user_id}")
return {"status": "ok", "message": "Bulk deletion queued (will delete all Miku messages from Discord and clear logs)"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to queue bulk conversation deletion for user {user_id}: {e}")
return {"status": "error", "message": f"Failed to delete conversations: {e}"}
@router.post("/dms/users/{user_id}/delete-completely")
def delete_user_completely(user_id: str):
"""Delete user's log file completely"""
try:
user_id_int = int(user_id)
success = dm_logger.delete_user_completely(user_id_int)
if success:
logger.info(f"Completely deleted user {user_id}")
return {"status": "ok", "message": "User data deleted completely"}
else:
return {"status": "error", "message": "No user data found"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to completely delete user {user_id}: {e}")
return {"status": "error", "message": f"Failed to delete user: {e}"}
# ========== DM Interaction Analysis Endpoints ==========
@router.post("/dms/analysis/run")
def run_dm_analysis():
"""Manually trigger the daily DM interaction analysis"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}
# Schedule analysis in Discord's event loop
async def run_analysis():
await dm_analyzer.run_daily_analysis()
globals.client.loop.create_task(run_analysis())
return {"status": "ok", "message": "DM analysis started"}
except Exception as e:
logger.error(f"Failed to run DM analysis: {e}")
return {"status": "error", "message": f"Failed to run DM analysis: {e}"}
@router.post("/dms/users/{user_id}/analyze")
def analyze_user_interaction(user_id: str):
"""Analyze a specific user's interaction and optionally send report"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}
user_id_int = int(user_id)
# Schedule analysis in Discord's event loop
async def run_analysis():
return await dm_analyzer.analyze_and_report(user_id_int)
globals.client.loop.create_task(run_analysis())
# Return immediately - the analysis will run in the background
return {"status": "ok", "message": f"Analysis started for user {user_id}", "reported": True}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to analyze user {user_id}: {e}")
return {"status": "error", "message": f"Failed to analyze user: {e}"}
@router.get("/dms/analysis/reports")
def get_analysis_reports(limit: int = 20):
"""Get recent analysis reports"""
try:
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR) if f.endswith('.json') and f != 'reported_today.json'],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
logger.warning(f"Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except Exception as e:
logger.error(f"Failed to get reports: {e}")
return {"status": "error", "message": f"Failed to get reports: {e}"}
@router.get("/dms/analysis/reports/{user_id}")
def get_user_reports(user_id: str, limit: int = 10):
"""Get analysis reports for a specific user"""
try:
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
user_id_int = int(user_id)
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR)
if f.startswith(f"{user_id}_") and f.endswith('.json')],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
logger.warning(f"Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
logger.error(f"Failed to get user reports: {e}")
return {"status": "error", "message": f"Failed to get user reports: {e}"}