"""DM routes: custom prompt DMs, manual DMs, logging, blocking, analysis.""" import io import os import json from typing import List from fastapi import APIRouter, UploadFile, File, Form from fastapi.responses import JSONResponse import discord import globals from routes.models import CustomPromptRequest from utils.dm_logger import dm_logger from utils.logger import get_logger logger = get_logger('api') router = APIRouter() # ========== DM Custom / Manual Send ========== @router.post("/dm/{user_id}/custom") async def send_custom_prompt_dm(user_id: str, req: CustomPromptRequest): """Send custom prompt via DM to a specific user""" try: user_id_int = int(user_id) user = globals.client.get_user(user_id_int) if not user: return JSONResponse(status_code=404, content={"status": "error", "message": f"User {user_id} not found"}) # Use the LLM query function for DM context from utils.llm import query_llama async def send_dm_custom_prompt(): try: response = await query_llama(req.prompt, user_id=user_id, guild_id=None, response_type="dm_response") await user.send(response) logger.info(f"Custom DM prompt sent to user {user_id}: {req.prompt[:50]}...") # Log to DM history dm_logger.log_conversation(user_id, req.prompt, response) except Exception as e: logger.error(f"Failed to send custom DM prompt to user {user_id}: {e}") # Use create_task to avoid timeout context manager error globals.client.loop.create_task(send_dm_custom_prompt()) return {"status": "ok", "message": f"Custom DM prompt queued for user {user_id}"} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid user ID format"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Error: {e}"}) @router.post("/dm/{user_id}/manual") async def send_manual_message_dm( user_id: str, message: str = Form(...), files: List[UploadFile] = File(default=[]), reply_to_message_id: str = Form(None), mention_author: bool = Form(True) ): """Send manual message via DM to a specific user""" try: user_id_int = int(user_id) user = globals.client.get_user(user_id_int) if not user: return JSONResponse(status_code=404, content={"status": "error", "message": f"User {user_id} not found"}) # Read file content immediately before the request closes file_data = [] for file in files: try: file_content = await file.read() file_data.append({ 'filename': file.filename, 'content': file_content }) except Exception as e: logger.error(f"Failed to read file {file.filename}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to read file {file.filename}: {e}"}) async def send_dm_message_and_files(): try: # Get the reference message if replying (must be done inside the task) reference_message = None if reply_to_message_id: try: dm_channel = user.dm_channel or await user.create_dm() reference_message = await dm_channel.fetch_message(int(reply_to_message_id)) except Exception as e: logger.error(f"Could not fetch DM message {reply_to_message_id} for reply: {e}") return # Send the main message if message.strip(): if reference_message: await user.send(message, reference=reference_message, mention_author=mention_author) logger.info(f"Manual DM reply message sent to user {user_id}") else: await user.send(message) logger.info(f"Manual DM message sent to user {user_id}") # Send files if any for file_info in file_data: try: await user.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename'])) logger.info(f"File {file_info['filename']} sent via DM to user {user_id}") except Exception as e: logger.error(f"Failed to send file {file_info['filename']} via DM: {e}") # Log to DM history (user message = manual override trigger, miku response = the message sent) dm_logger.log_conversation(user_id, "[Manual Override Trigger]", message, attachments=[f['filename'] for f in file_data]) except Exception as e: logger.error(f"Failed to send manual DM to user {user_id}: {e}") # Use create_task to avoid timeout context manager error globals.client.loop.create_task(send_dm_message_and_files()) return {"status": "ok", "message": f"Manual DM message queued for user {user_id}"} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid user ID format"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Error: {e}"}) # ========== DM Logging Endpoints ========== @router.get("/dms/users") def get_dm_users(): """Get summary of all users who have DMed the bot""" try: users = dm_logger.get_all_dm_users() return {"status": "ok", "users": users} except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get DM users: {e}"}) @router.get("/dms/users/{user_id}") def get_dm_user_conversation(user_id: str): """Get conversation summary for a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) summary = dm_logger.get_user_conversation_summary(user_id_int) return {"status": "ok", "summary": summary} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get user conversation: {e}"}) @router.get("/dms/users/{user_id}/conversations") def get_dm_conversations(user_id: str, limit: int = 50): """Get recent conversations with a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) logger.debug(f"Loading conversations for user {user_id_int}, limit: {limit}") logs = dm_logger._load_user_logs(user_id_int) logger.debug(f"Loaded logs for user {user_id_int}: {len(logs.get('conversations', []))} conversations") conversations = logs["conversations"][-limit:] if limit > 0 else logs["conversations"] # Convert message IDs to strings to prevent JavaScript precision loss for conv in conversations: if "message_id" in conv: conv["message_id"] = str(conv["message_id"]) logger.debug(f"Returning {len(conversations)} conversations") # Debug: Show message IDs being returned for i, conv in enumerate(conversations): msg_id = conv.get("message_id", "") is_bot = conv.get("is_bot_message", False) content_preview = conv.get("content", "")[:30] + "..." if conv.get("content", "") else "[No content]" logger.debug(f"Conv {i}: id={msg_id} (type: {type(msg_id)}), is_bot={is_bot}, content='{content_preview}'") return {"status": "ok", "conversations": conversations} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to get conversations for user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get conversations: {e}"}) @router.get("/dms/users/{user_id}/search") def search_dm_conversations(user_id: str, query: str, limit: int = 10): """Search conversations with a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) results = dm_logger.search_user_conversations(user_id_int, query, limit) return {"status": "ok", "results": results} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to search conversations: {e}"}) @router.get("/dms/users/{user_id}/export") def export_dm_conversation(user_id: str, format: str = "json"): """Export all conversations with a user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) export_path = dm_logger.export_user_conversation(user_id_int, format) return {"status": "ok", "export_path": export_path, "format": format} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to export conversation: {e}"}) @router.delete("/dms/users/{user_id}") def delete_dm_user_logs(user_id: str): """Delete all DM logs for a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) log_file = dm_logger._get_user_log_file(user_id_int) if os.path.exists(log_file): os.remove(log_file) return {"status": "ok", "message": f"Deleted DM logs for user {user_id}"} else: return JSONResponse(status_code=404, content={"status": "error", "message": f"No DM logs found for user {user_id}"}) except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete DM logs: {e}"}) # ========== User Blocking & DM Management ========== @router.get("/dms/blocked-users") def get_blocked_users(): """Get list of all blocked users""" try: blocked_users = dm_logger.get_blocked_users() return {"status": "ok", "blocked_users": blocked_users} except Exception as e: logger.error(f"Failed to get blocked users: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get blocked users: {e}"}) @router.post("/dms/users/{user_id}/block") def block_user(user_id: str): """Block a user from sending DMs to Miku""" try: user_id_int = int(user_id) # Get username from DM logs if available user_summary = dm_logger.get_user_conversation_summary(user_id_int) username = user_summary.get("username", "Unknown") success = dm_logger.block_user(user_id_int, username) if success: logger.info(f"User {user_id} ({username}) blocked") return {"status": "ok", "message": f"User {username} has been blocked"} else: return JSONResponse(status_code=409, content={"status": "error", "message": f"User {username} is already blocked"}) except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to block user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to block user: {e}"}) @router.post("/dms/users/{user_id}/unblock") def unblock_user(user_id: str): """Unblock a user""" try: user_id_int = int(user_id) success = dm_logger.unblock_user(user_id_int) if success: logger.info(f"User {user_id} unblocked") return {"status": "ok", "message": f"User has been unblocked"} else: return JSONResponse(status_code=409, content={"status": "error", "message": f"User is not blocked"}) except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to unblock user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to unblock user: {e}"}) @router.post("/dms/users/{user_id}/conversations/{conversation_id}/delete") def delete_conversation(user_id: str, conversation_id: str): """Delete a specific conversation/message from both Discord and logs""" try: user_id_int = int(user_id) # Queue the async deletion in the bot's event loop async def do_delete(): return await dm_logger.delete_conversation(user_id_int, conversation_id) globals.client.loop.create_task(do_delete()) # For now, return success immediately since we can't await in FastAPI sync endpoint # The actual deletion happens asynchronously logger.info(f"Queued deletion of conversation {conversation_id} for user {user_id}") return {"status": "ok", "message": "Message deletion queued (will delete from both Discord and logs)"} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to queue conversation deletion {conversation_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete conversation: {e}"}) @router.post("/dms/users/{user_id}/conversations/delete-all") def delete_all_conversations(user_id: str): """Delete all conversations with a user from both Discord and logs""" try: user_id_int = int(user_id) # Queue the async bulk deletion in the bot's event loop async def do_delete_all(): return await dm_logger.delete_all_conversations(user_id_int) globals.client.loop.create_task(do_delete_all()) # Return success immediately since we can't await in FastAPI sync endpoint logger.info(f"Queued bulk deletion of all conversations for user {user_id}") return {"status": "ok", "message": "Bulk deletion queued (will delete all Miku messages from Discord and clear logs)"} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to queue bulk conversation deletion for user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete conversations: {e}"}) @router.post("/dms/users/{user_id}/delete-completely") def delete_user_completely(user_id: str): """Delete user's log file completely""" try: user_id_int = int(user_id) success = dm_logger.delete_user_completely(user_id_int) if success: logger.info(f"Completely deleted user {user_id}") return {"status": "ok", "message": "User data deleted completely"} else: return JSONResponse(status_code=404, content={"status": "error", "message": "No user data found"}) except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to completely delete user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to delete user: {e}"}) # ========== DM Interaction Analysis Endpoints ========== @router.post("/dms/analysis/run") def run_dm_analysis(): """Manually trigger the daily DM interaction analysis""" try: from utils.dm_interaction_analyzer import dm_analyzer if dm_analyzer is None: return JSONResponse(status_code=503, content={"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}) # Schedule analysis in Discord's event loop async def run_analysis(): await dm_analyzer.run_daily_analysis() globals.client.loop.create_task(run_analysis()) return {"status": "ok", "message": "DM analysis started"} except Exception as e: logger.error(f"Failed to run DM analysis: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to run DM analysis: {e}"}) @router.post("/dms/users/{user_id}/analyze") def analyze_user_interaction(user_id: str): """Analyze a specific user's interaction and optionally send report""" try: from utils.dm_interaction_analyzer import dm_analyzer if dm_analyzer is None: return JSONResponse(status_code=503, content={"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}) user_id_int = int(user_id) # Schedule analysis in Discord's event loop async def run_analysis(): return await dm_analyzer.analyze_and_report(user_id_int) globals.client.loop.create_task(run_analysis()) # Return immediately - the analysis will run in the background return {"status": "ok", "message": f"Analysis started for user {user_id}", "reported": True} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to analyze user {user_id}: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to analyze user: {e}"}) @router.get("/dms/analysis/reports") def get_analysis_reports(limit: int = 20): """Get recent analysis reports""" try: from utils.dm_interaction_analyzer import REPORTS_DIR if not os.path.exists(REPORTS_DIR): return {"status": "ok", "reports": []} reports = [] files = sorted([f for f in os.listdir(REPORTS_DIR) if f.endswith('.json') and f != 'reported_today.json'], reverse=True)[:limit] for filename in files: try: with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f: report = json.load(f) report['filename'] = filename reports.append(report) except Exception as e: logger.warning(f"Failed to load report {filename}: {e}") return {"status": "ok", "reports": reports} except Exception as e: logger.error(f"Failed to get reports: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get reports: {e}"}) @router.get("/dms/analysis/reports/{user_id}") def get_user_reports(user_id: str, limit: int = 10): """Get analysis reports for a specific user""" try: from utils.dm_interaction_analyzer import REPORTS_DIR if not os.path.exists(REPORTS_DIR): return {"status": "ok", "reports": []} user_id_int = int(user_id) reports = [] files = sorted([f for f in os.listdir(REPORTS_DIR) if f.startswith(f"{user_id}_") and f.endswith('.json')], reverse=True)[:limit] for filename in files: try: with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f: report = json.load(f) report['filename'] = filename reports.append(report) except Exception as e: logger.warning(f"Failed to load report {filename}: {e}") return {"status": "ok", "reports": reports} except ValueError: return JSONResponse(status_code=400, content={"status": "error", "message": f"Invalid user ID format: {user_id}"}) except Exception as e: logger.error(f"Failed to get user reports: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": f"Failed to get user reports: {e}"})