Files
miku-discord/bot/routes/dms.py
koko210Serve edc9f27925 feat: add proper HTTP status codes to all API error responses
- 217 error returns across 18 route files + api.py now use JSONResponse
  with appropriate HTTP status codes instead of returning HTTP 200
- Status code distribution: 500 (121), 400 (39), 503 (28), 404 (24), 409 (3), 502 (2)
- Fixed language.py tuple-return bug (was serializing as JSON array)
- Fixed bare except clauses in bipolar_mode.py and voice.py
- Body-level error schemas preserved (status/error + success/error patterns)
  so web UI continues working without changes
- chat.py (SSE) unchanged: errors sent within stream protocol
- All 170 tests pass
2026-04-15 15:43:18 +03:00

469 lines
20 KiB
Python

"""DM routes: custom prompt DMs, manual DMs, logging, blocking, analysis."""
import io
import os
import json
from typing import List
from fastapi import APIRouter, UploadFile, File, Form
from fastapi.responses import JSONResponse
import discord
import globals
from routes.models import CustomPromptRequest
from utils.dm_logger import dm_logger
from utils.logger import get_logger
logger = get_logger('api')
router = APIRouter()
# ========== DM Custom / Manual Send ==========
@router.post("/dm/{user_id}/custom")
async def send_custom_prompt_dm(user_id: str, req: CustomPromptRequest):
"""Send custom prompt via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return JSONResponse(status_code=404, content={"status": "error", "message": f"User {user_id} not found"})
# Use the LLM query function for DM context
from utils.llm import query_llama
async def send_dm_custom_prompt():
try:
response = await query_llama(req.prompt, user_id=user_id, guild_id=None, response_type="dm_response")
await user.send(response)
logger.info(f"Custom DM prompt sent to user {user_id}: {req.prompt[:50]}...")
# Log to DM history
dm_logger.log_conversation(user_id, req.prompt, response)
except Exception as e:
logger.error(f"Failed to send custom DM prompt to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_custom_prompt())
return {"status": "ok", "message": f"Custom DM prompt queued for user {user_id}"}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid user ID format"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Error: {e}"})
@router.post("/dm/{user_id}/manual")
async def send_manual_message_dm(
user_id: str,
message: str = Form(...),
files: List[UploadFile] = File(default=[]),
reply_to_message_id: str = Form(None),
mention_author: bool = Form(True)
):
"""Send manual message via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return JSONResponse(status_code=404, content={"status": "error", "message": f"User {user_id} not found"})
# Read file content immediately before the request closes
file_data = []
for file in files:
try:
file_content = await file.read()
file_data.append({
'filename': file.filename,
'content': file_content
})
except Exception as e:
logger.error(f"Failed to read file {file.filename}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to read file {file.filename}: {e}"})
async def send_dm_message_and_files():
try:
# Get the reference message if replying (must be done inside the task)
reference_message = None
if reply_to_message_id:
try:
dm_channel = user.dm_channel or await user.create_dm()
reference_message = await dm_channel.fetch_message(int(reply_to_message_id))
except Exception as e:
logger.error(f"Could not fetch DM message {reply_to_message_id} for reply: {e}")
return
# Send the main message
if message.strip():
if reference_message:
await user.send(message, reference=reference_message, mention_author=mention_author)
logger.info(f"Manual DM reply message sent to user {user_id}")
else:
await user.send(message)
logger.info(f"Manual DM message sent to user {user_id}")
# Send files if any
for file_info in file_data:
try:
await user.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename']))
logger.info(f"File {file_info['filename']} sent via DM to user {user_id}")
except Exception as e:
logger.error(f"Failed to send file {file_info['filename']} via DM: {e}")
# Log to DM history (user message = manual override trigger, miku response = the message sent)
dm_logger.log_conversation(user_id, "[Manual Override Trigger]", message, attachments=[f['filename'] for f in file_data])
except Exception as e:
logger.error(f"Failed to send manual DM to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_message_and_files())
return {"status": "ok", "message": f"Manual DM message queued for user {user_id}"}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid user ID format"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Error: {e}"})
# ========== DM Logging Endpoints ==========
@router.get("/dms/users")
def get_dm_users():
"""Get summary of all users who have DMed the bot"""
try:
users = dm_logger.get_all_dm_users()
return {"status": "ok", "users": users}
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get DM users: {e}"})
@router.get("/dms/users/{user_id}")
def get_dm_user_conversation(user_id: str):
"""Get conversation summary for a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
summary = dm_logger.get_user_conversation_summary(user_id_int)
return {"status": "ok", "summary": summary}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get user conversation: {e}"})
@router.get("/dms/users/{user_id}/conversations")
def get_dm_conversations(user_id: str, limit: int = 50):
"""Get recent conversations with a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
logger.debug(f"Loading conversations for user {user_id_int}, limit: {limit}")
logs = dm_logger._load_user_logs(user_id_int)
logger.debug(f"Loaded logs for user {user_id_int}: {len(logs.get('conversations', []))} conversations")
conversations = logs["conversations"][-limit:] if limit > 0 else logs["conversations"]
# Convert message IDs to strings to prevent JavaScript precision loss
for conv in conversations:
if "message_id" in conv:
conv["message_id"] = str(conv["message_id"])
logger.debug(f"Returning {len(conversations)} conversations")
# Debug: Show message IDs being returned
for i, conv in enumerate(conversations):
msg_id = conv.get("message_id", "")
is_bot = conv.get("is_bot_message", False)
content_preview = conv.get("content", "")[:30] + "..." if conv.get("content", "") else "[No content]"
logger.debug(f"Conv {i}: id={msg_id} (type: {type(msg_id)}), is_bot={is_bot}, content='{content_preview}'")
return {"status": "ok", "conversations": conversations}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to get conversations for user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get conversations: {e}"})
@router.get("/dms/users/{user_id}/search")
def search_dm_conversations(user_id: str, query: str, limit: int = 10):
"""Search conversations with a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
results = dm_logger.search_user_conversations(user_id_int, query, limit)
return {"status": "ok", "results": results}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to search conversations: {e}"})
@router.get("/dms/users/{user_id}/export")
def export_dm_conversation(user_id: str, format: str = "json"):
"""Export all conversations with a user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
export_path = dm_logger.export_user_conversation(user_id_int, format)
return {"status": "ok", "export_path": export_path, "format": format}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to export conversation: {e}"})
@router.delete("/dms/users/{user_id}")
def delete_dm_user_logs(user_id: str):
"""Delete all DM logs for a specific user"""
try:
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
log_file = dm_logger._get_user_log_file(user_id_int)
if os.path.exists(log_file):
os.remove(log_file)
return {"status": "ok", "message": f"Deleted DM logs for user {user_id}"}
else:
return JSONResponse(status_code=404, content={"status": "error", "message": f"No DM logs found for user {user_id}"})
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete DM logs: {e}"})
# ========== User Blocking & DM Management ==========
@router.get("/dms/blocked-users")
def get_blocked_users():
"""Get list of all blocked users"""
try:
blocked_users = dm_logger.get_blocked_users()
return {"status": "ok", "blocked_users": blocked_users}
except Exception as e:
logger.error(f"Failed to get blocked users: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get blocked users: {e}"})
@router.post("/dms/users/{user_id}/block")
def block_user(user_id: str):
"""Block a user from sending DMs to Miku"""
try:
user_id_int = int(user_id)
# Get username from DM logs if available
user_summary = dm_logger.get_user_conversation_summary(user_id_int)
username = user_summary.get("username", "Unknown")
success = dm_logger.block_user(user_id_int, username)
if success:
logger.info(f"User {user_id} ({username}) blocked")
return {"status": "ok", "message": f"User {username} has been blocked"}
else:
return JSONResponse(status_code=409, content={"status": "error", "message": f"User {username} is already blocked"})
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to block user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to block user: {e}"})
@router.post("/dms/users/{user_id}/unblock")
def unblock_user(user_id: str):
"""Unblock a user"""
try:
user_id_int = int(user_id)
success = dm_logger.unblock_user(user_id_int)
if success:
logger.info(f"User {user_id} unblocked")
return {"status": "ok", "message": f"User has been unblocked"}
else:
return JSONResponse(status_code=409, content={"status": "error", "message": f"User is not blocked"})
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to unblock user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to unblock user: {e}"})
@router.post("/dms/users/{user_id}/conversations/{conversation_id}/delete")
def delete_conversation(user_id: str, conversation_id: str):
"""Delete a specific conversation/message from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async deletion in the bot's event loop
async def do_delete():
return await dm_logger.delete_conversation(user_id_int, conversation_id)
globals.client.loop.create_task(do_delete())
# For now, return success immediately since we can't await in FastAPI sync endpoint
# The actual deletion happens asynchronously
logger.info(f"Queued deletion of conversation {conversation_id} for user {user_id}")
return {"status": "ok", "message": "Message deletion queued (will delete from both Discord and logs)"}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to queue conversation deletion {conversation_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete conversation: {e}"})
@router.post("/dms/users/{user_id}/conversations/delete-all")
def delete_all_conversations(user_id: str):
"""Delete all conversations with a user from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async bulk deletion in the bot's event loop
async def do_delete_all():
return await dm_logger.delete_all_conversations(user_id_int)
globals.client.loop.create_task(do_delete_all())
# Return success immediately since we can't await in FastAPI sync endpoint
logger.info(f"Queued bulk deletion of all conversations for user {user_id}")
return {"status": "ok", "message": "Bulk deletion queued (will delete all Miku messages from Discord and clear logs)"}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to queue bulk conversation deletion for user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete conversations: {e}"})
@router.post("/dms/users/{user_id}/delete-completely")
def delete_user_completely(user_id: str):
"""Delete user's log file completely"""
try:
user_id_int = int(user_id)
success = dm_logger.delete_user_completely(user_id_int)
if success:
logger.info(f"Completely deleted user {user_id}")
return {"status": "ok", "message": "User data deleted completely"}
else:
return JSONResponse(status_code=404, content={"status": "error", "message": "No user data found"})
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to completely delete user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete user: {e}"})
# ========== DM Interaction Analysis Endpoints ==========
@router.post("/dms/analysis/run")
def run_dm_analysis():
"""Manually trigger the daily DM interaction analysis"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return JSONResponse(status_code=503, content={"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."})
# Schedule analysis in Discord's event loop
async def run_analysis():
await dm_analyzer.run_daily_analysis()
globals.client.loop.create_task(run_analysis())
return {"status": "ok", "message": "DM analysis started"}
except Exception as e:
logger.error(f"Failed to run DM analysis: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to run DM analysis: {e}"})
@router.post("/dms/users/{user_id}/analyze")
def analyze_user_interaction(user_id: str):
"""Analyze a specific user's interaction and optionally send report"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return JSONResponse(status_code=503, content={"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."})
user_id_int = int(user_id)
# Schedule analysis in Discord's event loop
async def run_analysis():
return await dm_analyzer.analyze_and_report(user_id_int)
globals.client.loop.create_task(run_analysis())
# Return immediately - the analysis will run in the background
return {"status": "ok", "message": f"Analysis started for user {user_id}", "reported": True}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to analyze user {user_id}: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to analyze user: {e}"})
@router.get("/dms/analysis/reports")
def get_analysis_reports(limit: int = 20):
"""Get recent analysis reports"""
try:
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR) if f.endswith('.json') and f != 'reported_today.json'],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
logger.warning(f"Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except Exception as e:
logger.error(f"Failed to get reports: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get reports: {e}"})
@router.get("/dms/analysis/reports/{user_id}")
def get_user_reports(user_id: str, limit: int = 10):
"""Get analysis reports for a specific user"""
try:
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
user_id_int = int(user_id)
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR)
if f.startswith(f"{user_id}_") and f.endswith('.json')],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
logger.warning(f"Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except ValueError:
return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"})
except Exception as e:
logger.error(f"Failed to get user reports: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get user reports: {e}"})