""" DM Logger Utility Handles logging all DM conversations with timestamps and file attachments """ import os import json import discord from datetime import datetime from typing import List, Optional import globals from utils.logger import get_logger logger = get_logger('dm') # Directory for storing DM logs DM_LOG_DIR = "memory/dms" BLOCKED_USERS_FILE = "memory/blocked_users.json" class DMLogger: def __init__(self): """Initialize the DM logger and ensure directory exists""" os.makedirs(DM_LOG_DIR, exist_ok=True) os.makedirs("memory", exist_ok=True) logger.info(f"DM Logger initialized: {DM_LOG_DIR}") def _get_user_log_file(self, user_id: int) -> str: """Get the log file path for a specific user""" return os.path.join(DM_LOG_DIR, f"{user_id}.json") def _load_user_logs(self, user_id: int) -> dict: """Load existing logs for a user, create new if doesn't exist""" log_file = self._get_user_log_file(user_id) logger.debug(f"DM Logger: Loading logs from {log_file}") if os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8') as f: logs = json.load(f) logger.debug(f"DM Logger: Successfully loaded logs for user {user_id}: {len(logs.get('conversations', []))} conversations") return logs except Exception as e: logger.error(f"DM Logger: Failed to load DM logs for user {user_id}: {e}") return {"user_id": user_id, "username": "Unknown", "conversations": []} else: logger.debug(f"DM Logger: No log file found for user {user_id}, creating new") return {"user_id": user_id, "username": "Unknown", "conversations": []} def _save_user_logs(self, user_id: int, logs: dict): """Save logs for a user""" log_file = self._get_user_log_file(user_id) try: with open(log_file, 'w', encoding='utf-8') as f: json.dump(logs, f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Failed to save DM logs for user {user_id}: {e}") def log_user_message(self, user: discord.User, message: discord.Message, is_bot_message: bool = False): """Log a user message in DMs""" user_id = user.id username = user.display_name or user.name # Load existing logs logs = self._load_user_logs(user_id) logs["username"] = username # Update username in case it changed # Create message entry message_entry = { "timestamp": datetime.now().isoformat(), "message_id": message.id, "is_bot_message": is_bot_message, "content": message.content if message.content else "", "attachments": [], "reactions": [] # Track reactions: [{emoji, reactor_id, reactor_name, is_bot, added_at}] } # Log file attachments if message.attachments: for attachment in message.attachments: attachment_info = { "filename": attachment.filename, "url": attachment.url, "size": attachment.size, "content_type": attachment.content_type } message_entry["attachments"].append(attachment_info) # Log embeds if message.embeds: message_entry["embeds"] = [embed.to_dict() for embed in message.embeds] # Add to conversations logs["conversations"].append(message_entry) # Keep only last 1000 messages to prevent files from getting too large if len(logs["conversations"]) > 1000: logs["conversations"] = logs["conversations"][-1000:] logger.info(f"DM logs for user {username} trimmed to last 1000 messages") # Save logs self._save_user_logs(user_id, logs) if is_bot_message: logger.debug(f"DM logged: Bot -> {username} ({len(message_entry['attachments'])} attachments)") else: logger.debug(f"DM logged: {username} -> Bot ({len(message_entry['attachments'])} attachments)") def get_user_conversation_summary(self, user_id: int) -> dict: """Get a summary of conversations with a user""" logs = self._load_user_logs(user_id) if not logs["conversations"]: return {"user_id": str(user_id), "username": logs["username"], "message_count": 0, "last_message": None} total_messages = len(logs["conversations"]) user_messages = len([msg for msg in logs["conversations"] if not msg["is_bot_message"]]) bot_messages = total_messages - user_messages # Get last message info last_message = logs["conversations"][-1] return { "user_id": str(user_id), # Convert to string to prevent JS precision loss "username": logs["username"], "total_messages": total_messages, "user_messages": user_messages, "bot_messages": bot_messages, "last_message": { "timestamp": last_message["timestamp"], "content": last_message["content"][:100] + "..." if len(last_message["content"]) > 100 else last_message["content"], "is_bot_message": last_message["is_bot_message"] } } def get_all_dm_users(self) -> List[dict]: """Get summary of all users who have DMed the bot""" users = [] if not os.path.exists(DM_LOG_DIR): return users for filename in os.listdir(DM_LOG_DIR): if filename.endswith('.json'): try: user_id = int(filename.replace('.json', '')) summary = self.get_user_conversation_summary(user_id) users.append(summary) except ValueError: continue # Sort by last message timestamp (most recent first) users.sort(key=lambda x: x["last_message"]["timestamp"] if x["last_message"] else "", reverse=True) return users def search_user_conversations(self, user_id: int, query: str, limit: int = 10) -> List[dict]: """Search conversations with a specific user""" logs = self._load_user_logs(user_id) results = [] query_lower = query.lower() for message in reversed(logs["conversations"]): # Search from newest to oldest if query_lower in message["content"].lower(): results.append(message) if len(results) >= limit: break return results def log_conversation(self, user_id: str, user_message: str, bot_response: str, attachments: list = None): """Log a conversation exchange (user message + bot response) for API usage""" try: user_id_int = int(user_id) # Get user object - try to find it from the client import globals user = globals.client.get_user(user_id_int) if not user: # If we can't find the user, create a mock user for logging purposes class MockUser: def __init__(self, user_id): self.id = user_id self.display_name = "Unknown" self.name = "Unknown" user = MockUser(user_id_int) # Create mock message objects for logging class MockMessage: def __init__(self, content, message_id=0, attachments=None): self.content = content self.id = message_id self.attachments = attachments or [] self.embeds = [] # Log the user message (trigger) if user_message: user_msg = MockMessage(user_message) self.log_user_message(user, user_msg, is_bot_message=False) # Log the bot response with attachments bot_attachments = [] if attachments: for filename in attachments: # Create mock attachment for filename logging class MockAttachment: def __init__(self, filename): self.filename = filename self.url = "" self.size = 0 self.content_type = "unknown" bot_attachments.append(MockAttachment(filename)) bot_msg = MockMessage(bot_response, attachments=bot_attachments) self.log_user_message(user, bot_msg, is_bot_message=True) logger.debug(f"Conversation logged for user {user_id}: user='{user_message[:50]}...', bot='{bot_response[:50]}...'") except Exception as e: logger.error(f"Failed to log conversation for user {user_id}: {e}") def export_user_conversation(self, user_id: int, format: str = "json") -> str: """Export all conversations with a user in specified format""" logs = self._load_user_logs(user_id) if format.lower() == "txt": # Export as readable text file export_file = os.path.join(DM_LOG_DIR, f"{user_id}_export.txt") with open(export_file, 'w', encoding='utf-8') as f: f.write(f"DM Conversation Log: {logs['username']} (ID: {user_id})\n") f.write("=" * 50 + "\n\n") for msg in logs["conversations"]: timestamp = msg["timestamp"] sender = "🤖 Miku" if msg["is_bot_message"] else f"👤 {logs['username']}" content = msg["content"] if msg["content"] else "[No text content]" f.write(f"[{timestamp}] {sender}:\n{content}\n") if msg["attachments"]: f.write("📎 Attachments:\n") for attachment in msg["attachments"]: f.write(f" - {attachment['filename']} ({attachment['size']} bytes)\n") f.write("\n" + "-" * 30 + "\n\n") return export_file else: # Default to JSON return self._get_user_log_file(user_id) def _load_blocked_users(self) -> dict: """Load the blocked users list""" if os.path.exists(BLOCKED_USERS_FILE): try: with open(BLOCKED_USERS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load blocked users: {e}") return {"blocked_users": []} return {"blocked_users": []} def _save_blocked_users(self, blocked_data: dict): """Save the blocked users list""" try: with open(BLOCKED_USERS_FILE, 'w', encoding='utf-8') as f: json.dump(blocked_data, f, indent=2) except Exception as e: logger.error(f"Failed to save blocked users: {e}") def is_user_blocked(self, user_id: int) -> bool: """Check if a user is blocked""" blocked_data = self._load_blocked_users() return user_id in blocked_data.get("blocked_users", []) def block_user(self, user_id: int, username: str = None) -> bool: """Block a user from sending DMs to Miku""" try: blocked_data = self._load_blocked_users() if user_id not in blocked_data["blocked_users"]: blocked_data["blocked_users"].append(user_id) # Store additional info about blocked users if "blocked_user_info" not in blocked_data: blocked_data["blocked_user_info"] = {} blocked_data["blocked_user_info"][str(user_id)] = { "username": username or "Unknown", "blocked_at": datetime.now().isoformat(), "blocked_by": "admin" } self._save_blocked_users(blocked_data) logger.info(f"User {user_id} ({username}) has been blocked") return True else: logger.warning(f"User {user_id} is already blocked") return False except Exception as e: logger.error(f"Failed to block user {user_id}: {e}") return False def unblock_user(self, user_id: int) -> bool: """Unblock a user""" try: blocked_data = self._load_blocked_users() if user_id in blocked_data["blocked_users"]: blocked_data["blocked_users"].remove(user_id) # Remove user info as well if "blocked_user_info" in blocked_data and str(user_id) in blocked_data["blocked_user_info"]: username = blocked_data["blocked_user_info"][str(user_id)].get("username", "Unknown") del blocked_data["blocked_user_info"][str(user_id)] else: username = "Unknown" self._save_blocked_users(blocked_data) logger.info(f"User {user_id} ({username}) has been unblocked") return True else: logger.warning(f"User {user_id} is not blocked") return False except Exception as e: logger.error(f"Failed to unblock user {user_id}: {e}") return False def get_blocked_users(self) -> List[dict]: """Get list of all blocked users""" blocked_data = self._load_blocked_users() result = [] for user_id in blocked_data.get("blocked_users", []): user_info = blocked_data.get("blocked_user_info", {}).get(str(user_id), {}) result.append({ "user_id": str(user_id), # String to prevent JS precision loss "username": user_info.get("username", "Unknown"), "blocked_at": user_info.get("blocked_at", "Unknown"), "blocked_by": user_info.get("blocked_by", "admin") }) return result async def log_reaction_add(self, user_id: int, message_id: int, emoji: str, reactor_id: int, reactor_name: str, is_bot_reactor: bool): """Log when a reaction is added to a message in DMs""" try: logs = self._load_user_logs(user_id) # Find the message to add the reaction to for message in logs["conversations"]: if message.get("message_id") == message_id: # Initialize reactions list if it doesn't exist if "reactions" not in message: message["reactions"] = [] # Check if this exact reaction already exists (shouldn't happen, but just in case) reaction_exists = any( r["emoji"] == emoji and r["reactor_id"] == reactor_id for r in message["reactions"] ) if not reaction_exists: reaction_entry = { "emoji": emoji, "reactor_id": reactor_id, "reactor_name": reactor_name, "is_bot": is_bot_reactor, "added_at": datetime.now().isoformat() } message["reactions"].append(reaction_entry) self._save_user_logs(user_id, logs) reactor_type = "🤖 Miku" if is_bot_reactor else f"👤 {reactor_name}" logger.debug(f"Reaction logged: {emoji} by {reactor_type} on message {message_id}") return True else: logger.debug(f"Reaction {emoji} by {reactor_name} already exists on message {message_id}") return False logger.warning(f"Message {message_id} not found in user {user_id}'s logs") return False except Exception as e: logger.error(f"Failed to log reaction add for user {user_id}, message {message_id}: {e}") return False async def log_reaction_remove(self, user_id: int, message_id: int, emoji: str, reactor_id: int): """Log when a reaction is removed from a message in DMs""" try: logs = self._load_user_logs(user_id) # Find the message to remove the reaction from for message in logs["conversations"]: if message.get("message_id") == message_id: if "reactions" in message: # Find and remove the specific reaction original_count = len(message["reactions"]) message["reactions"] = [ r for r in message["reactions"] if not (r["emoji"] == emoji and r["reactor_id"] == reactor_id) ] if len(message["reactions"]) < original_count: self._save_user_logs(user_id, logs) logger.debug(f"Reaction removed: {emoji} by user/bot {reactor_id} from message {message_id}") return True else: logger.debug(f"Reaction {emoji} by {reactor_id} not found on message {message_id}") return False else: logger.debug(f"No reactions on message {message_id}") return False logger.warning(f"Message {message_id} not found in user {user_id}'s logs") return False except Exception as e: logger.error(f"Failed to log reaction remove for user {user_id}, message {message_id}: {e}") return False async def delete_conversation(self, user_id: int, conversation_id: str) -> bool: """Delete a specific conversation/message from both Discord and logs (only bot messages can be deleted)""" try: logs = self._load_user_logs(user_id) logger.debug(f"DM Logger: Looking for bot message ID '{conversation_id}' for user {user_id}") logger.debug(f"DM Logger: Searching through {len(logs['conversations'])} conversations") # Convert conversation_id to int for comparison if it looks like a Discord message ID conv_id_as_int = None try: conv_id_as_int = int(conversation_id) except ValueError: pass # Find the specific bot message to delete message_to_delete = None for conv in logs["conversations"]: if (conv.get("is_bot_message", False) and (str(conv.get("message_id", "")) == conversation_id or conv.get("message_id", 0) == conv_id_as_int or conv.get("timestamp", "") == conversation_id)): message_to_delete = conv break if not message_to_delete: logger.warning(f"No bot message found with ID {conversation_id} for user {user_id}") return False # Try to delete from Discord first discord_deleted = False try: import globals if globals.client and hasattr(globals.client, 'get_user'): # Get the user and their DM channel user = globals.client.get_user(user_id) if user: dm_channel = user.dm_channel if not dm_channel: dm_channel = await user.create_dm() # Fetch and delete the message message_id = message_to_delete.get("message_id") if message_id: try: discord_message = await dm_channel.fetch_message(int(message_id)) await discord_message.delete() discord_deleted = True logger.info(f"Deleted Discord message {message_id} from DM with user {user_id}") except Exception as e: logger.warning(f"Could not delete Discord message {message_id}: {e}") # Continue anyway to delete from logs except Exception as e: logger.warning(f"Discord deletion failed: {e}") # Continue anyway to delete from logs # Remove from logs regardless of Discord deletion success original_count = len(logs["conversations"]) logs["conversations"] = [conv for conv in logs["conversations"] if not ( # Match by message_id (as int or string) AND it's a bot message (conv.get("is_bot_message", False) and (str(conv.get("message_id", "")) == conversation_id or conv.get("message_id", 0) == conv_id_as_int or conv.get("timestamp", "") == conversation_id)) )] deleted_count = original_count - len(logs["conversations"]) if deleted_count > 0: self._save_user_logs(user_id, logs) if discord_deleted: logger.info(f"Deleted bot message from both Discord and logs for user {user_id}") else: logger.info(f"Deleted bot message from logs only (Discord deletion failed) for user {user_id}") return True else: logger.warning(f"No bot message found in logs with ID {conversation_id} for user {user_id}") return False except Exception as e: logger.error(f"Failed to delete conversation {conversation_id} for user {user_id}: {e}") return False async def delete_all_conversations(self, user_id: int) -> bool: """Delete all conversations with a user from both Discord and logs""" try: logs = self._load_user_logs(user_id) conversation_count = len(logs["conversations"]) if conversation_count == 0: logger.warning(f"No conversations found for user {user_id}") return False # Find all bot messages to delete from Discord bot_messages = [conv for conv in logs["conversations"] if conv.get("is_bot_message", False)] logger.debug(f"Found {len(bot_messages)} bot messages to delete from Discord for user {user_id}") # Try to delete all bot messages from Discord discord_deleted_count = 0 try: import globals if globals.client and hasattr(globals.client, 'get_user'): # Get the user and their DM channel user = globals.client.get_user(user_id) if user: dm_channel = user.dm_channel if not dm_channel: dm_channel = await user.create_dm() # Delete each bot message from Discord for conv in bot_messages: message_id = conv.get("message_id") if message_id: try: discord_message = await dm_channel.fetch_message(int(message_id)) await discord_message.delete() discord_deleted_count += 1 logger.info(f"Deleted Discord message {message_id} from DM with user {user_id}") except Exception as e: logger.error(f"Could not delete Discord message {message_id}: {e}") # Continue with other messages except Exception as e: logger.warning(f"Discord bulk deletion failed: {e}") # Continue anyway to delete from logs # Delete all conversations from logs regardless of Discord deletion success logs["conversations"] = [] self._save_user_logs(user_id, logs) if discord_deleted_count > 0: logger.info(f"Deleted {discord_deleted_count} bot messages from Discord and all {conversation_count} conversations from logs for user {user_id}") else: logger.info(f"Deleted all {conversation_count} conversations from logs only (Discord deletion failed) for user {user_id}") return True except Exception as e: logger.error(f"Failed to delete all conversations for user {user_id}: {e}") return False def delete_user_completely(self, user_id: int) -> bool: """Delete user's log file completely""" try: log_file = self._get_user_log_file(user_id) if os.path.exists(log_file): os.remove(log_file) logger.info(f"Completely deleted log file for user {user_id}") return True else: logger.warning(f"No log file found for user {user_id}") return False except Exception as e: logger.error(f"Failed to delete user log file {user_id}: {e}") return False # Global instance dm_logger = DMLogger()