"""DM routes: custom prompt DMs, manual DMs, logging, blocking, analysis.""" import io import os import json from typing import List from fastapi import APIRouter, UploadFile, File, Form import discord import globals from routes.models import CustomPromptRequest from utils.dm_logger import dm_logger from utils.logger import get_logger logger = get_logger('api') router = APIRouter() # ========== DM Custom / Manual Send ========== @router.post("/dm/{user_id}/custom") async def send_custom_prompt_dm(user_id: str, req: CustomPromptRequest): """Send custom prompt via DM to a specific user""" try: user_id_int = int(user_id) user = globals.client.get_user(user_id_int) if not user: return {"status": "error", "message": f"User {user_id} not found"} # Use the LLM query function for DM context from utils.llm import query_llama async def send_dm_custom_prompt(): try: response = await query_llama(req.prompt, user_id=user_id, guild_id=None, response_type="dm_response") await user.send(response) logger.info(f"Custom DM prompt sent to user {user_id}: {req.prompt[:50]}...") # Log to DM history dm_logger.log_conversation(user_id, req.prompt, response) except Exception as e: logger.error(f"Failed to send custom DM prompt to user {user_id}: {e}") # Use create_task to avoid timeout context manager error globals.client.loop.create_task(send_dm_custom_prompt()) return {"status": "ok", "message": f"Custom DM prompt queued for user {user_id}"} except ValueError: return {"status": "error", "message": "Invalid user ID format"} except Exception as e: return {"status": "error", "message": f"Error: {e}"} @router.post("/dm/{user_id}/manual") async def send_manual_message_dm( user_id: str, message: str = Form(...), files: List[UploadFile] = File(default=[]), reply_to_message_id: str = Form(None), mention_author: bool = Form(True) ): """Send manual message via DM to a specific user""" try: user_id_int = int(user_id) user = globals.client.get_user(user_id_int) if not user: return {"status": "error", "message": f"User {user_id} not found"} # Read file content immediately before the request closes file_data = [] for file in files: try: file_content = await file.read() file_data.append({ 'filename': file.filename, 'content': file_content }) except Exception as e: logger.error(f"Failed to read file {file.filename}: {e}") return {"status": "error", "message": f"Failed to read file {file.filename}: {e}"} async def send_dm_message_and_files(): try: # Get the reference message if replying (must be done inside the task) reference_message = None if reply_to_message_id: try: dm_channel = user.dm_channel or await user.create_dm() reference_message = await dm_channel.fetch_message(int(reply_to_message_id)) except Exception as e: logger.error(f"Could not fetch DM message {reply_to_message_id} for reply: {e}") return # Send the main message if message.strip(): if reference_message: await user.send(message, reference=reference_message, mention_author=mention_author) logger.info(f"Manual DM reply message sent to user {user_id}") else: await user.send(message) logger.info(f"Manual DM message sent to user {user_id}") # Send files if any for file_info in file_data: try: await user.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename'])) logger.info(f"File {file_info['filename']} sent via DM to user {user_id}") except Exception as e: logger.error(f"Failed to send file {file_info['filename']} via DM: {e}") # Log to DM history (user message = manual override trigger, miku response = the message sent) dm_logger.log_conversation(user_id, "[Manual Override Trigger]", message, attachments=[f['filename'] for f in file_data]) except Exception as e: logger.error(f"Failed to send manual DM to user {user_id}: {e}") # Use create_task to avoid timeout context manager error globals.client.loop.create_task(send_dm_message_and_files()) return {"status": "ok", "message": f"Manual DM message queued for user {user_id}"} except ValueError: return {"status": "error", "message": "Invalid user ID format"} except Exception as e: return {"status": "error", "message": f"Error: {e}"} # ========== DM Logging Endpoints ========== @router.get("/dms/users") def get_dm_users(): """Get summary of all users who have DMed the bot""" try: users = dm_logger.get_all_dm_users() return {"status": "ok", "users": users} except Exception as e: return {"status": "error", "message": f"Failed to get DM users: {e}"} @router.get("/dms/users/{user_id}") def get_dm_user_conversation(user_id: str): """Get conversation summary for a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) summary = dm_logger.get_user_conversation_summary(user_id_int) return {"status": "ok", "summary": summary} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: return {"status": "error", "message": f"Failed to get user conversation: {e}"} @router.get("/dms/users/{user_id}/conversations") def get_dm_conversations(user_id: str, limit: int = 50): """Get recent conversations with a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) logger.debug(f"Loading conversations for user {user_id_int}, limit: {limit}") logs = dm_logger._load_user_logs(user_id_int) logger.debug(f"Loaded logs for user {user_id_int}: {len(logs.get('conversations', []))} conversations") conversations = logs["conversations"][-limit:] if limit > 0 else logs["conversations"] # Convert message IDs to strings to prevent JavaScript precision loss for conv in conversations: if "message_id" in conv: conv["message_id"] = str(conv["message_id"]) logger.debug(f"Returning {len(conversations)} conversations") # Debug: Show message IDs being returned for i, conv in enumerate(conversations): msg_id = conv.get("message_id", "") is_bot = conv.get("is_bot_message", False) content_preview = conv.get("content", "")[:30] + "..." if conv.get("content", "") else "[No content]" logger.debug(f"Conv {i}: id={msg_id} (type: {type(msg_id)}), is_bot={is_bot}, content='{content_preview}'") return {"status": "ok", "conversations": conversations} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to get conversations for user {user_id}: {e}") return {"status": "error", "message": f"Failed to get conversations: {e}"} @router.get("/dms/users/{user_id}/search") def search_dm_conversations(user_id: str, query: str, limit: int = 10): """Search conversations with a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) results = dm_logger.search_user_conversations(user_id_int, query, limit) return {"status": "ok", "results": results} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: return {"status": "error", "message": f"Failed to search conversations: {e}"} @router.get("/dms/users/{user_id}/export") def export_dm_conversation(user_id: str, format: str = "json"): """Export all conversations with a user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) export_path = dm_logger.export_user_conversation(user_id_int, format) return {"status": "ok", "export_path": export_path, "format": format} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: return {"status": "error", "message": f"Failed to export conversation: {e}"} @router.delete("/dms/users/{user_id}") def delete_dm_user_logs(user_id: str): """Delete all DM logs for a specific user""" try: # Convert string user_id to int for internal processing user_id_int = int(user_id) log_file = dm_logger._get_user_log_file(user_id_int) if os.path.exists(log_file): os.remove(log_file) return {"status": "ok", "message": f"Deleted DM logs for user {user_id}"} else: return {"status": "error", "message": f"No DM logs found for user {user_id}"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: return {"status": "error", "message": f"Failed to delete DM logs: {e}"} # ========== User Blocking & DM Management ========== @router.get("/dms/blocked-users") def get_blocked_users(): """Get list of all blocked users""" try: blocked_users = dm_logger.get_blocked_users() return {"status": "ok", "blocked_users": blocked_users} except Exception as e: logger.error(f"Failed to get blocked users: {e}") return {"status": "error", "message": f"Failed to get blocked users: {e}"} @router.post("/dms/users/{user_id}/block") def block_user(user_id: str): """Block a user from sending DMs to Miku""" try: user_id_int = int(user_id) # Get username from DM logs if available user_summary = dm_logger.get_user_conversation_summary(user_id_int) username = user_summary.get("username", "Unknown") success = dm_logger.block_user(user_id_int, username) if success: logger.info(f"User {user_id} ({username}) blocked") return {"status": "ok", "message": f"User {username} has been blocked"} else: return {"status": "error", "message": f"User {username} is already blocked"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to block user {user_id}: {e}") return {"status": "error", "message": f"Failed to block user: {e}"} @router.post("/dms/users/{user_id}/unblock") def unblock_user(user_id: str): """Unblock a user""" try: user_id_int = int(user_id) success = dm_logger.unblock_user(user_id_int) if success: logger.info(f"User {user_id} unblocked") return {"status": "ok", "message": f"User has been unblocked"} else: return {"status": "error", "message": f"User is not blocked"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to unblock user {user_id}: {e}") return {"status": "error", "message": f"Failed to unblock user: {e}"} @router.post("/dms/users/{user_id}/conversations/{conversation_id}/delete") def delete_conversation(user_id: str, conversation_id: str): """Delete a specific conversation/message from both Discord and logs""" try: user_id_int = int(user_id) # Queue the async deletion in the bot's event loop async def do_delete(): return await dm_logger.delete_conversation(user_id_int, conversation_id) globals.client.loop.create_task(do_delete()) # For now, return success immediately since we can't await in FastAPI sync endpoint # The actual deletion happens asynchronously logger.info(f"Queued deletion of conversation {conversation_id} for user {user_id}") return {"status": "ok", "message": "Message deletion queued (will delete from both Discord and logs)"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to queue conversation deletion {conversation_id}: {e}") return {"status": "error", "message": f"Failed to delete conversation: {e}"} @router.post("/dms/users/{user_id}/conversations/delete-all") def delete_all_conversations(user_id: str): """Delete all conversations with a user from both Discord and logs""" try: user_id_int = int(user_id) # Queue the async bulk deletion in the bot's event loop async def do_delete_all(): return await dm_logger.delete_all_conversations(user_id_int) globals.client.loop.create_task(do_delete_all()) # Return success immediately since we can't await in FastAPI sync endpoint logger.info(f"Queued bulk deletion of all conversations for user {user_id}") return {"status": "ok", "message": "Bulk deletion queued (will delete all Miku messages from Discord and clear logs)"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to queue bulk conversation deletion for user {user_id}: {e}") return {"status": "error", "message": f"Failed to delete conversations: {e}"} @router.post("/dms/users/{user_id}/delete-completely") def delete_user_completely(user_id: str): """Delete user's log file completely""" try: user_id_int = int(user_id) success = dm_logger.delete_user_completely(user_id_int) if success: logger.info(f"Completely deleted user {user_id}") return {"status": "ok", "message": "User data deleted completely"} else: return {"status": "error", "message": "No user data found"} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to completely delete user {user_id}: {e}") return {"status": "error", "message": f"Failed to delete user: {e}"} # ========== DM Interaction Analysis Endpoints ========== @router.post("/dms/analysis/run") def run_dm_analysis(): """Manually trigger the daily DM interaction analysis""" try: from utils.dm_interaction_analyzer import dm_analyzer if dm_analyzer is None: return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."} # Schedule analysis in Discord's event loop async def run_analysis(): await dm_analyzer.run_daily_analysis() globals.client.loop.create_task(run_analysis()) return {"status": "ok", "message": "DM analysis started"} except Exception as e: logger.error(f"Failed to run DM analysis: {e}") return {"status": "error", "message": f"Failed to run DM analysis: {e}"} @router.post("/dms/users/{user_id}/analyze") def analyze_user_interaction(user_id: str): """Analyze a specific user's interaction and optionally send report""" try: from utils.dm_interaction_analyzer import dm_analyzer if dm_analyzer is None: return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."} user_id_int = int(user_id) # Schedule analysis in Discord's event loop async def run_analysis(): return await dm_analyzer.analyze_and_report(user_id_int) globals.client.loop.create_task(run_analysis()) # Return immediately - the analysis will run in the background return {"status": "ok", "message": f"Analysis started for user {user_id}", "reported": True} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to analyze user {user_id}: {e}") return {"status": "error", "message": f"Failed to analyze user: {e}"} @router.get("/dms/analysis/reports") def get_analysis_reports(limit: int = 20): """Get recent analysis reports""" try: from utils.dm_interaction_analyzer import REPORTS_DIR if not os.path.exists(REPORTS_DIR): return {"status": "ok", "reports": []} reports = [] files = sorted([f for f in os.listdir(REPORTS_DIR) if f.endswith('.json') and f != 'reported_today.json'], reverse=True)[:limit] for filename in files: try: with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f: report = json.load(f) report['filename'] = filename reports.append(report) except Exception as e: logger.warning(f"Failed to load report {filename}: {e}") return {"status": "ok", "reports": reports} except Exception as e: logger.error(f"Failed to get reports: {e}") return {"status": "error", "message": f"Failed to get reports: {e}"} @router.get("/dms/analysis/reports/{user_id}") def get_user_reports(user_id: str, limit: int = 10): """Get analysis reports for a specific user""" try: from utils.dm_interaction_analyzer import REPORTS_DIR if not os.path.exists(REPORTS_DIR): return {"status": "ok", "reports": []} user_id_int = int(user_id) reports = [] files = sorted([f for f in os.listdir(REPORTS_DIR) if f.startswith(f"{user_id}_") and f.endswith('.json')], reverse=True)[:limit] for filename in files: try: with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f: report = json.load(f) report['filename'] = filename reports.append(report) except Exception as e: logger.warning(f"Failed to load report {filename}: {e}") return {"status": "ok", "reports": reports} except ValueError: return {"status": "error", "message": f"Invalid user ID format: {user_id}"} except Exception as e: logger.error(f"Failed to get user reports: {e}") return {"status": "error", "message": f"Failed to get user reports: {e}"}