578 lines
26 KiB
Python
578 lines
26 KiB
Python
"""
|
||
DM Logger Utility
|
||
Handles logging all DM conversations with timestamps and file attachments
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import discord
|
||
from datetime import datetime
|
||
from typing import List, Optional
|
||
import globals
|
||
|
||
# Directory for storing DM logs
|
||
DM_LOG_DIR = "memory/dms"
|
||
BLOCKED_USERS_FILE = "memory/blocked_users.json"
|
||
|
||
class DMLogger:
|
||
def __init__(self):
|
||
"""Initialize the DM logger and ensure directory exists"""
|
||
os.makedirs(DM_LOG_DIR, exist_ok=True)
|
||
os.makedirs("memory", exist_ok=True)
|
||
print(f"📁 DM Logger initialized: {DM_LOG_DIR}")
|
||
|
||
def _get_user_log_file(self, user_id: int) -> str:
|
||
"""Get the log file path for a specific user"""
|
||
return os.path.join(DM_LOG_DIR, f"{user_id}.json")
|
||
|
||
def _load_user_logs(self, user_id: int) -> dict:
|
||
"""Load existing logs for a user, create new if doesn't exist"""
|
||
log_file = self._get_user_log_file(user_id)
|
||
print(f"📁 DM Logger: Loading logs from {log_file}")
|
||
|
||
if os.path.exists(log_file):
|
||
try:
|
||
with open(log_file, 'r', encoding='utf-8') as f:
|
||
logs = json.load(f)
|
||
print(f"📁 DM Logger: Successfully loaded logs for user {user_id}: {len(logs.get('conversations', []))} conversations")
|
||
return logs
|
||
except Exception as e:
|
||
print(f"⚠️ DM Logger: Failed to load DM logs for user {user_id}: {e}")
|
||
return {"user_id": user_id, "username": "Unknown", "conversations": []}
|
||
else:
|
||
print(f"📁 DM Logger: No log file found for user {user_id}, creating new")
|
||
return {"user_id": user_id, "username": "Unknown", "conversations": []}
|
||
|
||
def _save_user_logs(self, user_id: int, logs: dict):
|
||
"""Save logs for a user"""
|
||
log_file = self._get_user_log_file(user_id)
|
||
try:
|
||
with open(log_file, 'w', encoding='utf-8') as f:
|
||
json.dump(logs, f, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to save DM logs for user {user_id}: {e}")
|
||
|
||
def log_user_message(self, user: discord.User, message: discord.Message, is_bot_message: bool = False):
|
||
"""Log a user message in DMs"""
|
||
user_id = user.id
|
||
username = user.display_name or user.name
|
||
|
||
# Load existing logs
|
||
logs = self._load_user_logs(user_id)
|
||
logs["username"] = username # Update username in case it changed
|
||
|
||
# Create message entry
|
||
message_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"message_id": message.id,
|
||
"is_bot_message": is_bot_message,
|
||
"content": message.content if message.content else "",
|
||
"attachments": [],
|
||
"reactions": [] # Track reactions: [{emoji, reactor_id, reactor_name, is_bot, added_at}]
|
||
}
|
||
|
||
# Log file attachments
|
||
if message.attachments:
|
||
for attachment in message.attachments:
|
||
attachment_info = {
|
||
"filename": attachment.filename,
|
||
"url": attachment.url,
|
||
"size": attachment.size,
|
||
"content_type": attachment.content_type
|
||
}
|
||
message_entry["attachments"].append(attachment_info)
|
||
|
||
# Log embeds
|
||
if message.embeds:
|
||
message_entry["embeds"] = [embed.to_dict() for embed in message.embeds]
|
||
|
||
# Add to conversations
|
||
logs["conversations"].append(message_entry)
|
||
|
||
# Keep only last 1000 messages to prevent files from getting too large
|
||
if len(logs["conversations"]) > 1000:
|
||
logs["conversations"] = logs["conversations"][-1000:]
|
||
print(f"📝 DM logs for user {username} trimmed to last 1000 messages")
|
||
|
||
# Save logs
|
||
self._save_user_logs(user_id, logs)
|
||
|
||
if is_bot_message:
|
||
print(f"🤖 DM logged: Bot -> {username} ({len(message_entry['attachments'])} attachments)")
|
||
else:
|
||
print(f"💬 DM logged: {username} -> Bot ({len(message_entry['attachments'])} attachments)")
|
||
|
||
def get_user_conversation_summary(self, user_id: int) -> dict:
|
||
"""Get a summary of conversations with a user"""
|
||
logs = self._load_user_logs(user_id)
|
||
|
||
if not logs["conversations"]:
|
||
return {"user_id": str(user_id), "username": logs["username"], "message_count": 0, "last_message": None}
|
||
|
||
total_messages = len(logs["conversations"])
|
||
user_messages = len([msg for msg in logs["conversations"] if not msg["is_bot_message"]])
|
||
bot_messages = total_messages - user_messages
|
||
|
||
# Get last message info
|
||
last_message = logs["conversations"][-1]
|
||
|
||
return {
|
||
"user_id": str(user_id), # Convert to string to prevent JS precision loss
|
||
"username": logs["username"],
|
||
"total_messages": total_messages,
|
||
"user_messages": user_messages,
|
||
"bot_messages": bot_messages,
|
||
"last_message": {
|
||
"timestamp": last_message["timestamp"],
|
||
"content": last_message["content"][:100] + "..." if len(last_message["content"]) > 100 else last_message["content"],
|
||
"is_bot_message": last_message["is_bot_message"]
|
||
}
|
||
}
|
||
|
||
def get_all_dm_users(self) -> List[dict]:
|
||
"""Get summary of all users who have DMed the bot"""
|
||
users = []
|
||
|
||
if not os.path.exists(DM_LOG_DIR):
|
||
return users
|
||
|
||
for filename in os.listdir(DM_LOG_DIR):
|
||
if filename.endswith('.json'):
|
||
try:
|
||
user_id = int(filename.replace('.json', ''))
|
||
summary = self.get_user_conversation_summary(user_id)
|
||
users.append(summary)
|
||
except ValueError:
|
||
continue
|
||
|
||
# Sort by last message timestamp (most recent first)
|
||
users.sort(key=lambda x: x["last_message"]["timestamp"] if x["last_message"] else "", reverse=True)
|
||
return users
|
||
|
||
def search_user_conversations(self, user_id: int, query: str, limit: int = 10) -> List[dict]:
|
||
"""Search conversations with a specific user"""
|
||
logs = self._load_user_logs(user_id)
|
||
results = []
|
||
|
||
query_lower = query.lower()
|
||
for message in reversed(logs["conversations"]): # Search from newest to oldest
|
||
if query_lower in message["content"].lower():
|
||
results.append(message)
|
||
if len(results) >= limit:
|
||
break
|
||
|
||
return results
|
||
|
||
def log_conversation(self, user_id: str, user_message: str, bot_response: str, attachments: list = None):
|
||
"""Log a conversation exchange (user message + bot response) for API usage"""
|
||
try:
|
||
user_id_int = int(user_id)
|
||
|
||
# Get user object - try to find it from the client
|
||
import globals
|
||
user = globals.client.get_user(user_id_int)
|
||
if not user:
|
||
# If we can't find the user, create a mock user for logging purposes
|
||
class MockUser:
|
||
def __init__(self, user_id):
|
||
self.id = user_id
|
||
self.display_name = "Unknown"
|
||
self.name = "Unknown"
|
||
|
||
user = MockUser(user_id_int)
|
||
|
||
# Create mock message objects for logging
|
||
class MockMessage:
|
||
def __init__(self, content, message_id=0, attachments=None):
|
||
self.content = content
|
||
self.id = message_id
|
||
self.attachments = attachments or []
|
||
self.embeds = []
|
||
|
||
# Log the user message (trigger)
|
||
if user_message:
|
||
user_msg = MockMessage(user_message)
|
||
self.log_user_message(user, user_msg, is_bot_message=False)
|
||
|
||
# Log the bot response with attachments
|
||
bot_attachments = []
|
||
if attachments:
|
||
for filename in attachments:
|
||
# Create mock attachment for filename logging
|
||
class MockAttachment:
|
||
def __init__(self, filename):
|
||
self.filename = filename
|
||
self.url = ""
|
||
self.size = 0
|
||
self.content_type = "unknown"
|
||
|
||
bot_attachments.append(MockAttachment(filename))
|
||
|
||
bot_msg = MockMessage(bot_response, attachments=bot_attachments)
|
||
self.log_user_message(user, bot_msg, is_bot_message=True)
|
||
|
||
print(f"📝 Conversation logged for user {user_id}: user='{user_message[:50]}...', bot='{bot_response[:50]}...'")
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to log conversation for user {user_id}: {e}")
|
||
|
||
def export_user_conversation(self, user_id: int, format: str = "json") -> str:
|
||
"""Export all conversations with a user in specified format"""
|
||
logs = self._load_user_logs(user_id)
|
||
|
||
if format.lower() == "txt":
|
||
# Export as readable text file
|
||
export_file = os.path.join(DM_LOG_DIR, f"{user_id}_export.txt")
|
||
|
||
with open(export_file, 'w', encoding='utf-8') as f:
|
||
f.write(f"DM Conversation Log: {logs['username']} (ID: {user_id})\n")
|
||
f.write("=" * 50 + "\n\n")
|
||
|
||
for msg in logs["conversations"]:
|
||
timestamp = msg["timestamp"]
|
||
sender = "🤖 Miku" if msg["is_bot_message"] else f"👤 {logs['username']}"
|
||
content = msg["content"] if msg["content"] else "[No text content]"
|
||
|
||
f.write(f"[{timestamp}] {sender}:\n{content}\n")
|
||
|
||
if msg["attachments"]:
|
||
f.write("📎 Attachments:\n")
|
||
for attachment in msg["attachments"]:
|
||
f.write(f" - {attachment['filename']} ({attachment['size']} bytes)\n")
|
||
|
||
f.write("\n" + "-" * 30 + "\n\n")
|
||
|
||
return export_file
|
||
else:
|
||
# Default to JSON
|
||
return self._get_user_log_file(user_id)
|
||
|
||
def _load_blocked_users(self) -> dict:
|
||
"""Load the blocked users list"""
|
||
if os.path.exists(BLOCKED_USERS_FILE):
|
||
try:
|
||
with open(BLOCKED_USERS_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to load blocked users: {e}")
|
||
return {"blocked_users": []}
|
||
return {"blocked_users": []}
|
||
|
||
def _save_blocked_users(self, blocked_data: dict):
|
||
"""Save the blocked users list"""
|
||
try:
|
||
with open(BLOCKED_USERS_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(blocked_data, f, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to save blocked users: {e}")
|
||
|
||
def is_user_blocked(self, user_id: int) -> bool:
|
||
"""Check if a user is blocked"""
|
||
blocked_data = self._load_blocked_users()
|
||
return user_id in blocked_data.get("blocked_users", [])
|
||
|
||
def block_user(self, user_id: int, username: str = None) -> bool:
|
||
"""Block a user from sending DMs to Miku"""
|
||
try:
|
||
blocked_data = self._load_blocked_users()
|
||
if user_id not in blocked_data["blocked_users"]:
|
||
blocked_data["blocked_users"].append(user_id)
|
||
|
||
# Store additional info about blocked users
|
||
if "blocked_user_info" not in blocked_data:
|
||
blocked_data["blocked_user_info"] = {}
|
||
|
||
blocked_data["blocked_user_info"][str(user_id)] = {
|
||
"username": username or "Unknown",
|
||
"blocked_at": datetime.now().isoformat(),
|
||
"blocked_by": "admin"
|
||
}
|
||
|
||
self._save_blocked_users(blocked_data)
|
||
print(f"🚫 User {user_id} ({username}) has been blocked")
|
||
return True
|
||
else:
|
||
print(f"⚠️ User {user_id} is already blocked")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Failed to block user {user_id}: {e}")
|
||
return False
|
||
|
||
def unblock_user(self, user_id: int) -> bool:
|
||
"""Unblock a user"""
|
||
try:
|
||
blocked_data = self._load_blocked_users()
|
||
if user_id in blocked_data["blocked_users"]:
|
||
blocked_data["blocked_users"].remove(user_id)
|
||
|
||
# Remove user info as well
|
||
if "blocked_user_info" in blocked_data and str(user_id) in blocked_data["blocked_user_info"]:
|
||
username = blocked_data["blocked_user_info"][str(user_id)].get("username", "Unknown")
|
||
del blocked_data["blocked_user_info"][str(user_id)]
|
||
else:
|
||
username = "Unknown"
|
||
|
||
self._save_blocked_users(blocked_data)
|
||
print(f"✅ User {user_id} ({username}) has been unblocked")
|
||
return True
|
||
else:
|
||
print(f"⚠️ User {user_id} is not blocked")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Failed to unblock user {user_id}: {e}")
|
||
return False
|
||
|
||
def get_blocked_users(self) -> List[dict]:
|
||
"""Get list of all blocked users"""
|
||
blocked_data = self._load_blocked_users()
|
||
result = []
|
||
|
||
for user_id in blocked_data.get("blocked_users", []):
|
||
user_info = blocked_data.get("blocked_user_info", {}).get(str(user_id), {})
|
||
result.append({
|
||
"user_id": str(user_id), # String to prevent JS precision loss
|
||
"username": user_info.get("username", "Unknown"),
|
||
"blocked_at": user_info.get("blocked_at", "Unknown"),
|
||
"blocked_by": user_info.get("blocked_by", "admin")
|
||
})
|
||
|
||
return result
|
||
|
||
async def log_reaction_add(self, user_id: int, message_id: int, emoji: str, reactor_id: int, reactor_name: str, is_bot_reactor: bool):
|
||
"""Log when a reaction is added to a message in DMs"""
|
||
try:
|
||
logs = self._load_user_logs(user_id)
|
||
|
||
# Find the message to add the reaction to
|
||
for message in logs["conversations"]:
|
||
if message.get("message_id") == message_id:
|
||
# Initialize reactions list if it doesn't exist
|
||
if "reactions" not in message:
|
||
message["reactions"] = []
|
||
|
||
# Check if this exact reaction already exists (shouldn't happen, but just in case)
|
||
reaction_exists = any(
|
||
r["emoji"] == emoji and r["reactor_id"] == reactor_id
|
||
for r in message["reactions"]
|
||
)
|
||
|
||
if not reaction_exists:
|
||
reaction_entry = {
|
||
"emoji": emoji,
|
||
"reactor_id": reactor_id,
|
||
"reactor_name": reactor_name,
|
||
"is_bot": is_bot_reactor,
|
||
"added_at": datetime.now().isoformat()
|
||
}
|
||
message["reactions"].append(reaction_entry)
|
||
self._save_user_logs(user_id, logs)
|
||
|
||
reactor_type = "🤖 Miku" if is_bot_reactor else f"👤 {reactor_name}"
|
||
print(f"➕ Reaction logged: {emoji} by {reactor_type} on message {message_id}")
|
||
return True
|
||
else:
|
||
print(f"⚠️ Reaction {emoji} by {reactor_name} already exists on message {message_id}")
|
||
return False
|
||
|
||
print(f"⚠️ Message {message_id} not found in user {user_id}'s logs")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Failed to log reaction add for user {user_id}, message {message_id}: {e}")
|
||
return False
|
||
|
||
async def log_reaction_remove(self, user_id: int, message_id: int, emoji: str, reactor_id: int):
|
||
"""Log when a reaction is removed from a message in DMs"""
|
||
try:
|
||
logs = self._load_user_logs(user_id)
|
||
|
||
# Find the message to remove the reaction from
|
||
for message in logs["conversations"]:
|
||
if message.get("message_id") == message_id:
|
||
if "reactions" in message:
|
||
# Find and remove the specific reaction
|
||
original_count = len(message["reactions"])
|
||
message["reactions"] = [
|
||
r for r in message["reactions"]
|
||
if not (r["emoji"] == emoji and r["reactor_id"] == reactor_id)
|
||
]
|
||
|
||
if len(message["reactions"]) < original_count:
|
||
self._save_user_logs(user_id, logs)
|
||
print(f"➖ Reaction removed: {emoji} by user/bot {reactor_id} from message {message_id}")
|
||
return True
|
||
else:
|
||
print(f"⚠️ Reaction {emoji} by {reactor_id} not found on message {message_id}")
|
||
return False
|
||
else:
|
||
print(f"⚠️ No reactions on message {message_id}")
|
||
return False
|
||
|
||
print(f"⚠️ Message {message_id} not found in user {user_id}'s logs")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Failed to log reaction remove for user {user_id}, message {message_id}: {e}")
|
||
return False
|
||
|
||
async def delete_conversation(self, user_id: int, conversation_id: str) -> bool:
|
||
"""Delete a specific conversation/message from both Discord and logs (only bot messages can be deleted)"""
|
||
try:
|
||
logs = self._load_user_logs(user_id)
|
||
|
||
print(f"🔍 DM Logger: Looking for bot message ID '{conversation_id}' for user {user_id}")
|
||
print(f"🔍 DM Logger: Searching through {len(logs['conversations'])} conversations")
|
||
|
||
# Convert conversation_id to int for comparison if it looks like a Discord message ID
|
||
conv_id_as_int = None
|
||
try:
|
||
conv_id_as_int = int(conversation_id)
|
||
except ValueError:
|
||
pass
|
||
|
||
# Find the specific bot message to delete
|
||
message_to_delete = None
|
||
for conv in logs["conversations"]:
|
||
if (conv.get("is_bot_message", False) and
|
||
(str(conv.get("message_id", "")) == conversation_id or
|
||
conv.get("message_id", 0) == conv_id_as_int or
|
||
conv.get("timestamp", "") == conversation_id)):
|
||
message_to_delete = conv
|
||
break
|
||
|
||
if not message_to_delete:
|
||
print(f"⚠️ No bot message found with ID {conversation_id} for user {user_id}")
|
||
return False
|
||
|
||
# Try to delete from Discord first
|
||
discord_deleted = False
|
||
try:
|
||
import globals
|
||
if globals.client and hasattr(globals.client, 'get_user'):
|
||
# Get the user and their DM channel
|
||
user = globals.client.get_user(user_id)
|
||
if user:
|
||
dm_channel = user.dm_channel
|
||
if not dm_channel:
|
||
dm_channel = await user.create_dm()
|
||
|
||
# Fetch and delete the message
|
||
message_id = message_to_delete.get("message_id")
|
||
if message_id:
|
||
try:
|
||
discord_message = await dm_channel.fetch_message(int(message_id))
|
||
await discord_message.delete()
|
||
discord_deleted = True
|
||
print(f"✅ Deleted Discord message {message_id} from DM with user {user_id}")
|
||
except Exception as e:
|
||
print(f"⚠️ Could not delete Discord message {message_id}: {e}")
|
||
# Continue anyway to delete from logs
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Discord deletion failed: {e}")
|
||
# Continue anyway to delete from logs
|
||
|
||
# Remove from logs regardless of Discord deletion success
|
||
original_count = len(logs["conversations"])
|
||
logs["conversations"] = [conv for conv in logs["conversations"]
|
||
if not (
|
||
# Match by message_id (as int or string) AND it's a bot message
|
||
(conv.get("is_bot_message", False) and
|
||
(str(conv.get("message_id", "")) == conversation_id or
|
||
conv.get("message_id", 0) == conv_id_as_int or
|
||
conv.get("timestamp", "") == conversation_id))
|
||
)]
|
||
|
||
deleted_count = original_count - len(logs["conversations"])
|
||
|
||
if deleted_count > 0:
|
||
self._save_user_logs(user_id, logs)
|
||
if discord_deleted:
|
||
print(f"🗑️ Deleted bot message from both Discord and logs for user {user_id}")
|
||
else:
|
||
print(f"🗑️ Deleted bot message from logs only (Discord deletion failed) for user {user_id}")
|
||
return True
|
||
else:
|
||
print(f"⚠️ No bot message found in logs with ID {conversation_id} for user {user_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Failed to delete conversation {conversation_id} for user {user_id}: {e}")
|
||
return False
|
||
|
||
async def delete_all_conversations(self, user_id: int) -> bool:
|
||
"""Delete all conversations with a user from both Discord and logs"""
|
||
try:
|
||
logs = self._load_user_logs(user_id)
|
||
conversation_count = len(logs["conversations"])
|
||
|
||
if conversation_count == 0:
|
||
print(f"⚠️ No conversations found for user {user_id}")
|
||
return False
|
||
|
||
# Find all bot messages to delete from Discord
|
||
bot_messages = [conv for conv in logs["conversations"] if conv.get("is_bot_message", False)]
|
||
print(f"🔍 Found {len(bot_messages)} bot messages to delete from Discord for user {user_id}")
|
||
|
||
# Try to delete all bot messages from Discord
|
||
discord_deleted_count = 0
|
||
try:
|
||
import globals
|
||
if globals.client and hasattr(globals.client, 'get_user'):
|
||
# Get the user and their DM channel
|
||
user = globals.client.get_user(user_id)
|
||
if user:
|
||
dm_channel = user.dm_channel
|
||
if not dm_channel:
|
||
dm_channel = await user.create_dm()
|
||
|
||
# Delete each bot message from Discord
|
||
for conv in bot_messages:
|
||
message_id = conv.get("message_id")
|
||
if message_id:
|
||
try:
|
||
discord_message = await dm_channel.fetch_message(int(message_id))
|
||
await discord_message.delete()
|
||
discord_deleted_count += 1
|
||
print(f"✅ Deleted Discord message {message_id} from DM with user {user_id}")
|
||
except Exception as e:
|
||
print(f"⚠️ Could not delete Discord message {message_id}: {e}")
|
||
# Continue with other messages
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Discord bulk deletion failed: {e}")
|
||
# Continue anyway to delete from logs
|
||
|
||
# Delete all conversations from logs regardless of Discord deletion success
|
||
logs["conversations"] = []
|
||
self._save_user_logs(user_id, logs)
|
||
|
||
if discord_deleted_count > 0:
|
||
print(f"🗑️ Deleted {discord_deleted_count} bot messages from Discord and all {conversation_count} conversations from logs for user {user_id}")
|
||
else:
|
||
print(f"🗑️ Deleted all {conversation_count} conversations from logs only (Discord deletion failed) for user {user_id}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ Failed to delete all conversations for user {user_id}: {e}")
|
||
return False
|
||
|
||
def delete_user_completely(self, user_id: int) -> bool:
|
||
"""Delete user's log file completely"""
|
||
try:
|
||
log_file = self._get_user_log_file(user_id)
|
||
if os.path.exists(log_file):
|
||
os.remove(log_file)
|
||
print(f"🗑️ Completely deleted log file for user {user_id}")
|
||
return True
|
||
else:
|
||
print(f"⚠️ No log file found for user {user_id}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Failed to delete user log file {user_id}: {e}")
|
||
return False
|
||
|
||
# Global instance
|
||
dm_logger = DMLogger()
|