Files
miku-discord/bot/utils/profile_picture_manager.py
koko210Serve 9009e9fc80 Add animated GIF support for profile pictures
- Detect animated GIFs and preserve animation frames during upload
- Extract dominant color from first frame for role color syncing
- Generate multi-frame descriptions using existing video analysis pipeline
- Skip face detection/cropping for GIFs to maintain original animation
- Update UI to inform users about GIF support and Nitro requirement
- Add metadata flag to distinguish animated vs static profile pictures
2025-12-07 23:48:12 +02:00

1373 lines
56 KiB
Python

# profile_picture_manager.py
"""
Intelligent profile picture manager for Miku.
Handles searching, face detection, cropping, and Discord avatar updates.
Supports both static images and animated GIFs:
- Static images (PNG, JPG, etc.): Full processing with face detection, smart cropping, resizing,
and single-frame description generation
- Animated GIFs: Fast path that preserves animation, extracts frames for multi-frame description,
and extracts dominant color from first frame
Note: Animated avatars require Discord Nitro on the bot account
"""
import os
import io
import aiohttp
import asyncio
from PIL import Image, ImageDraw
import numpy as np
import cv2
from datetime import datetime
from typing import Optional, Dict, Tuple, List
import json
import discord
import globals
from .danbooru_client import danbooru_client
import globals
class ProfilePictureManager:
"""Manages Miku's profile picture with intelligent cropping and face detection"""
PROFILE_PIC_DIR = "memory/profile_pictures"
FALLBACK_PATH = "memory/profile_pictures/fallback.png"
CURRENT_PATH = "memory/profile_pictures/current.png"
METADATA_PATH = "memory/profile_pictures/metadata.json"
# Face detection API endpoint
FACE_DETECTOR_API = "http://anime-face-detector:6078/detect"
# Fallback role color (Miku's iconic teal)
FALLBACK_ROLE_COLOR = (134, 206, 203) # #86cecb
def __init__(self):
self._ensure_directories()
def _ensure_directories(self):
"""Ensure profile picture directory exists"""
os.makedirs(self.PROFILE_PIC_DIR, exist_ok=True)
async def initialize(self):
"""Initialize the profile picture manager (check API availability)"""
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://anime-face-detector:6078/health", timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
print("✅ Anime face detector API connected")
return True
except Exception as e:
print(f"⚠️ Face detector API not available: {e}")
print(" Profile picture changes will use fallback cropping")
return False
async def _ensure_vram_available(self, debug: bool = False):
"""
Ensure VRAM is available for face detection by swapping to text model.
This unloads the vision model if it's loaded.
"""
try:
if debug:
print("💾 Swapping to text model to free VRAM for face detection...")
# Make a simple request to text model to trigger swap
async with aiohttp.ClientSession() as session:
payload = {
"model": "llama3.1",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 1,
"stream": False
}
async with session.post(
"http://llama-swap:8080/v1/chat/completions",
json=payload,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status == 200:
if debug:
print("✅ Vision model unloaded, VRAM available")
# Give system time to fully release VRAM
await asyncio.sleep(3)
return True
except Exception as e:
if debug:
print(f"⚠️ Could not swap models: {e}")
return False
async def _start_face_detector(self, debug: bool = False) -> bool:
"""Start the face detector container using Docker socket API"""
try:
if debug:
print("🚀 Starting face detector container...")
# Use Docker socket API to start container
import aiofiles
import json as json_lib
# Docker socket path
socket_path = "/var/run/docker.sock"
# Check if socket exists
if not os.path.exists(socket_path):
if debug:
print("⚠️ Docker socket not available")
return False
# Use aiohttp UnixConnector to communicate with Docker socket
from aiohttp import UnixConnector
async with aiohttp.ClientSession(
connector=UnixConnector(path=socket_path)
) as session:
# Start the container
url = "http://localhost/containers/anime-face-detector/start"
async with session.post(url) as response:
if response.status not in [204, 304]: # 204=started, 304=already running
if debug:
error_text = await response.text()
print(f"⚠️ Failed to start container: {response.status} - {error_text}")
return False
# Wait for API to be ready
for i in range(30): # 30 second timeout
try:
async with aiohttp.ClientSession() as session:
async with session.get(
"http://anime-face-detector:6078/health",
timeout=aiohttp.ClientTimeout(total=2)
) as response:
if response.status == 200:
if debug:
print(f"✅ Face detector ready (took {i+1}s)")
return True
except:
pass
await asyncio.sleep(1)
if debug:
print("⚠️ Face detector didn't become ready in time")
return False
except Exception as e:
if debug:
print(f"⚠️ Error starting face detector: {e}")
return False
async def _stop_face_detector(self, debug: bool = False):
"""Stop the face detector container using Docker socket API"""
try:
if debug:
print("🛑 Stopping face detector to free VRAM...")
socket_path = "/var/run/docker.sock"
if not os.path.exists(socket_path):
if debug:
print("⚠️ Docker socket not available")
return
from aiohttp import UnixConnector
async with aiohttp.ClientSession(
connector=UnixConnector(path=socket_path)
) as session:
# Stop the container
url = "http://localhost/containers/anime-face-detector/stop"
async with session.post(url, params={"t": 10}) as response: # 10 second timeout
if response.status in [204, 304]: # 204=stopped, 304=already stopped
if debug:
print("✅ Face detector stopped")
else:
if debug:
error_text = await response.text()
print(f"⚠️ Failed to stop container: {response.status} - {error_text}")
except Exception as e:
if debug:
print(f"⚠️ Error stopping face detector: {e}")
async def save_current_avatar_as_fallback(self):
"""Save the bot's current avatar as fallback (only if fallback doesn't exist)"""
try:
# Only save if fallback doesn't already exist
if os.path.exists(self.FALLBACK_PATH):
print("✅ Fallback avatar already exists, skipping save")
return True
if not globals.client or not globals.client.user:
print("⚠️ Bot client not ready")
return False
avatar_asset = globals.client.user.avatar or globals.client.user.default_avatar
# Download avatar
avatar_bytes = await avatar_asset.read()
# Save as fallback
with open(self.FALLBACK_PATH, 'wb') as f:
f.write(avatar_bytes)
print(f"✅ Saved current avatar as fallback ({len(avatar_bytes)} bytes)")
return True
except Exception as e:
print(f"⚠️ Error saving fallback avatar: {e}")
return False
async def change_profile_picture(
self,
mood: Optional[str] = None,
custom_image_bytes: Optional[bytes] = None,
debug: bool = False,
max_retries: int = 5
) -> Dict:
"""
Main function to change Miku's profile picture.
Args:
mood: Current mood to influence Danbooru search
custom_image_bytes: If provided, use this image instead of Danbooru
debug: Enable debug output
max_retries: Maximum number of attempts to find a valid Miku image (for Danbooru)
Returns:
Dict with status and metadata
"""
result = {
"success": False,
"source": None,
"error": None,
"metadata": {}
}
try:
# Step 1: Get and validate image (with retry for Danbooru)
image_bytes = None
image = None
is_animated_gif = False
if custom_image_bytes:
# Custom upload - no retry needed
if debug:
print("🖼️ Using provided custom image")
image_bytes = custom_image_bytes
result["source"] = "custom_upload"
# Load image with PIL
try:
image = Image.open(io.BytesIO(image_bytes))
if debug:
print(f"📐 Original image size: {image.size}")
# Check if it's an animated GIF
if image.format == 'GIF':
try:
# Check if GIF has multiple frames
image.seek(1)
is_animated_gif = True
image.seek(0) # Reset to first frame
if debug:
print("🎬 Detected animated GIF - will preserve animation")
except EOFError:
# Only one frame, treat as static image
if debug:
print("🖼️ Single-frame GIF - will process as static image")
except Exception as e:
result["error"] = f"Failed to open image: {e}"
return result
else:
# Danbooru - retry until we find a valid Miku image
if debug:
print(f"🎨 Searching Danbooru for Miku image (mood: {mood})")
for attempt in range(max_retries):
if attempt > 0 and debug:
print(f"🔄 Retry attempt {attempt + 1}/{max_retries}")
post = await danbooru_client.get_random_miku_image(mood=mood)
if not post:
continue
image_url = danbooru_client.extract_image_url(post)
if not image_url:
continue
# Download image
temp_image_bytes = await self._download_image(image_url)
if not temp_image_bytes:
continue
if debug:
print(f"✅ Downloaded image from Danbooru (post #{danbooru_client.get_post_metadata(post).get('id')})")
# Load image with PIL
try:
temp_image = Image.open(io.BytesIO(temp_image_bytes))
if debug:
print(f"📐 Original image size: {temp_image.size}")
except Exception as e:
if debug:
print(f"⚠️ Failed to open image: {e}")
continue
# Verify it's Miku
miku_verification = await self._verify_and_locate_miku(temp_image_bytes, debug=debug)
if not miku_verification["is_miku"]:
if debug:
print(f"❌ Image verification failed: not Miku, trying another...")
continue
# Success! This image is valid
image_bytes = temp_image_bytes
image = temp_image
result["source"] = "danbooru"
result["metadata"] = danbooru_client.get_post_metadata(post)
# If multiple characters detected, use LLM's suggested crop region
if miku_verification.get("crop_region"):
if debug:
print(f"🎯 Using LLM-suggested crop region for Miku")
image = self._apply_crop_region(image, miku_verification["crop_region"])
break
# Check if we found a valid image
if not image_bytes or not image:
result["error"] = f"Could not find valid Miku image after {max_retries} attempts"
return result
# === ANIMATED GIF FAST PATH ===
# If this is an animated GIF, skip most processing and use raw bytes
if is_animated_gif:
if debug:
print("🎬 Using GIF fast path - skipping face detection and cropping")
# Generate description of the animated GIF
if debug:
print("📝 Generating GIF description using video analysis pipeline...")
description = await self._generate_gif_description(image_bytes, debug=debug)
if description:
# Save description to file
description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
try:
with open(description_path, 'w', encoding='utf-8') as f:
f.write(description)
result["metadata"]["description"] = description
if debug:
print(f"📝 Saved GIF description ({len(description)} chars)")
except Exception as e:
print(f"⚠️ Failed to save description file: {e}")
else:
if debug:
print("⚠️ GIF description generation returned None")
# Extract dominant color from first frame
dominant_color = self._extract_dominant_color(image, debug=debug)
if dominant_color:
result["metadata"]["dominant_color"] = {
"rgb": dominant_color,
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
}
if debug:
print(f"🎨 Dominant color from first frame: RGB{dominant_color} (#{result['metadata']['dominant_color']['hex'][1:]})")
# Save the original GIF bytes
with open(self.CURRENT_PATH, 'wb') as f:
f.write(image_bytes)
if debug:
print(f"💾 Saved animated GIF ({len(image_bytes)} bytes)")
# Update Discord avatar with original GIF
if globals.client and globals.client.user:
try:
if globals.client.loop and globals.client.loop.is_running():
future = asyncio.run_coroutine_threadsafe(
globals.client.user.edit(avatar=image_bytes),
globals.client.loop
)
future.result(timeout=10)
else:
await globals.client.user.edit(avatar=image_bytes)
result["success"] = True
result["metadata"]["changed_at"] = datetime.now().isoformat()
result["metadata"]["animated"] = True
# Save metadata
self._save_metadata(result["metadata"])
print(f"✅ Animated profile picture updated successfully!")
# Update role colors if we have a dominant color
if dominant_color:
await self._update_role_colors(dominant_color, debug=debug)
return result
except discord.HTTPException as e:
result["error"] = f"Discord API error: {e}"
print(f"⚠️ Failed to update Discord avatar with GIF: {e}")
print(f" Note: Animated avatars require Discord Nitro")
return result
except Exception as e:
result["error"] = f"Unexpected error updating avatar: {e}"
print(f"⚠️ Unexpected error: {e}")
return result
else:
result["error"] = "Bot client not ready"
return result
# === NORMAL STATIC IMAGE PATH ===
# Step 2: Generate description of the validated image
if debug:
print("📝 Generating image description...")
description = await self._generate_image_description(image_bytes, debug=debug)
if description:
# Save description to file
description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
try:
with open(description_path, 'w', encoding='utf-8') as f:
f.write(description)
result["metadata"]["description"] = description
if debug:
print(f"📝 Saved image description ({len(description)} chars)")
except Exception as e:
print(f"⚠️ Failed to save description file: {e}")
else:
if debug:
print("⚠️ Description generation returned None")
# Step 3: Detect face and crop intelligently
cropped_image = await self._intelligent_crop(image, image_bytes, target_size=512, debug=debug)
if not cropped_image:
result["error"] = "Failed to crop image"
return result
# Step 4: Save the cropped image first
output_buffer = io.BytesIO()
cropped_image.save(output_buffer, format='PNG')
cropped_bytes = output_buffer.getvalue()
# Save to disk as current
with open(self.CURRENT_PATH, 'wb') as f:
f.write(cropped_bytes)
if debug:
print(f"💾 Saved cropped image ({len(cropped_bytes)} bytes)")
# Step 5: Extract dominant color from saved current.png
saved_image = Image.open(self.CURRENT_PATH)
dominant_color = self._extract_dominant_color(saved_image, debug=debug)
if dominant_color:
result["metadata"]["dominant_color"] = {
"rgb": dominant_color,
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
}
if debug:
print(f"🎨 Dominant color: RGB{dominant_color} (#{result['metadata']['dominant_color']['hex'][1:]})")
# Step 6: Update Discord avatar
if globals.client and globals.client.user:
try:
# Run the edit operation in the bot's event loop
if globals.client.loop and globals.client.loop.is_running():
# Create a future to run in bot's loop
future = asyncio.run_coroutine_threadsafe(
globals.client.user.edit(avatar=cropped_bytes),
globals.client.loop
)
# Wait for the result
future.result(timeout=10)
else:
# Fallback if loop not available (shouldn't happen)
await globals.client.user.edit(avatar=cropped_bytes)
result["success"] = True
result["metadata"]["changed_at"] = datetime.now().isoformat()
result["metadata"]["animated"] = False
# Save metadata
self._save_metadata(result["metadata"])
print(f"✅ Profile picture updated successfully!")
# Step 7: Update role colors across all servers
if dominant_color:
await self._update_role_colors(dominant_color, debug=debug)
except discord.HTTPException as e:
result["error"] = f"Discord API error: {e}"
print(f"⚠️ Failed to update Discord avatar: {e}")
except Exception as e:
result["error"] = f"Unexpected error updating avatar: {e}"
print(f"⚠️ Unexpected error: {e}")
else:
result["error"] = "Bot client not ready"
except Exception as e:
result["error"] = f"Unexpected error: {e}"
print(f"⚠️ Error in change_profile_picture: {e}")
return result
async def _download_image(self, url: str) -> Optional[bytes]:
"""Download image from URL"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=15) as response:
if response.status == 200:
return await response.read()
except Exception as e:
print(f"⚠️ Error downloading image: {e}")
return None
async def _generate_image_description(self, image_bytes: bytes, debug: bool = False) -> Optional[str]:
"""
Generate a detailed description of the profile picture using vision model.
This description will be used when users ask about the pfp.
Args:
image_bytes: Raw image bytes
debug: Enable debug output
Returns:
Description string or None
"""
try:
import base64
image_b64 = base64.b64encode(image_bytes).decode('utf-8')
if debug:
print(f"📸 Encoded image: {len(image_b64)} chars, calling vision model...")
prompt = """This is an image of Hatsune Miku that will be used as a profile picture.
Please describe this image in detail, including:
- What Miku is wearing (outfit, colors, accessories)
- Her pose and expression
- The art style and mood of the image
- Any notable background elements
- Overall atmosphere or theme
Keep the description conversational and in second-person (referring to Miku as "you"), as if Miku herself is describing her own appearance in this image."""
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_b64}"
}
}
]
}
],
"stream": False,
"max_tokens": 400,
"temperature": 0.7
}
headers = {"Content-Type": "application/json"}
if debug:
print(f"🌐 Calling {globals.LLAMA_URL}/v1/chat/completions with model {globals.VISION_MODEL}")
async with aiohttp.ClientSession() as session:
async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status == 200:
data = await resp.json()
if debug:
print(f"📦 API Response keys: {data.keys()}")
print(f"📦 Choices: {data.get('choices', [])}")
# Try to get content from the response
choice = data.get("choices", [{}])[0]
message = choice.get("message", {})
# Check both 'content' and 'reasoning_content' fields
description = message.get("content", "")
# If content is empty, try reasoning_content (chain-of-thought models)
if not description or not description.strip():
description = message.get("reasoning_content", "")
if description and description.strip():
if debug:
print(f"✅ Generated description: {description[:100]}...")
return description.strip()
else:
if debug:
print(f"⚠️ Description is empty or None")
print(f" Full response: {data}")
else:
print(f"⚠️ Description is empty or None")
return None
else:
error_text = await resp.text()
print(f"❌ Vision API error generating description: {resp.status} - {error_text}")
except Exception as e:
print(f"⚠️ Error generating image description: {e}")
import traceback
traceback.print_exc()
return None
async def _generate_gif_description(self, gif_bytes: bytes, debug: bool = False) -> Optional[str]:
"""
Generate a detailed description of an animated GIF using the video analysis pipeline.
Args:
gif_bytes: Raw GIF bytes
debug: Enable debug output
Returns:
Description string or None
"""
try:
from utils.image_handling import extract_video_frames, analyze_video_with_vision
if debug:
print("🎬 Extracting frames from GIF...")
# Extract frames from the GIF (6 frames for good analysis)
frames = await extract_video_frames(gif_bytes, num_frames=6)
if not frames:
if debug:
print("⚠️ Failed to extract frames from GIF")
return None
if debug:
print(f"✅ Extracted {len(frames)} frames from GIF")
print(f"🌐 Analyzing GIF with vision model...")
# Use the existing analyze_video_with_vision function (no timeout issues)
# Note: This uses a generic prompt, but it works reliably
description = await analyze_video_with_vision(frames, media_type="gif")
if description and description.strip() and not description.startswith("Error"):
if debug:
print(f"✅ Generated GIF description: {description[:100]}...")
return description.strip()
else:
if debug:
print(f"⚠️ GIF description failed or empty: {description}")
return None
except Exception as e:
print(f"⚠️ Error generating GIF description: {e}")
import traceback
traceback.print_exc()
return None
async def _verify_and_locate_miku(self, image_bytes: bytes, debug: bool = False) -> Dict:
"""
Use vision model to verify image contains Miku and locate her if multiple characters.
Returns:
Dict with is_miku bool and optional crop_region
"""
result = {
"is_miku": False,
"crop_region": None,
"character_count": 0
}
try:
# Encode image to base64
import base64
image_b64 = base64.b64encode(image_bytes).decode('utf-8')
# Query vision model using OpenAI-compatible API
prompt = """Analyze this image and answer:
1. Is Hatsune Miku present in this image? (yes/no)
2. How many characters are in the image? (number)
3. If there are multiple characters, describe where Miku is located (left/right/center, top/bottom/middle)
Respond in JSON format:
{
"is_miku": true/false,
"character_count": number,
"miku_location": "description or null"
}"""
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_b64}"
}
}
]
}
],
"stream": False,
"max_tokens": 200,
"temperature": 0.3
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
response = data.get("choices", [{}])[0].get("message", {}).get("content", "")
else:
error_text = await resp.text()
print(f"❌ Vision API error: {resp.status} - {error_text}")
return result
if debug:
print(f"🤖 Vision model response: {response}")
# Parse JSON response
import re
json_match = re.search(r'\{[^}]+\}', response)
if json_match:
data = json.loads(json_match.group())
result["is_miku"] = data.get("is_miku", False)
result["character_count"] = data.get("character_count", 1)
# If multiple characters, parse location
if result["character_count"] > 1 and data.get("miku_location"):
result["crop_region"] = self._parse_location_to_region(
data["miku_location"],
debug=debug
)
else:
# Fallback: simple text analysis
response_lower = response.lower()
result["is_miku"] = "yes" in response_lower or "miku" in response_lower
except Exception as e:
print(f"⚠️ Error in vision verification: {e}")
# Assume it's Miku on error (trust Danbooru tags)
result["is_miku"] = True
return result
def _parse_location_to_region(self, location: str, debug: bool = False) -> Optional[Dict]:
"""Parse location description to crop region coordinates"""
location_lower = location.lower()
# Simple region detection
region = {
"horizontal": "center", # left, center, right
"vertical": "middle" # top, middle, bottom
}
if "left" in location_lower:
region["horizontal"] = "left"
elif "right" in location_lower:
region["horizontal"] = "right"
if "top" in location_lower:
region["vertical"] = "top"
elif "bottom" in location_lower:
region["vertical"] = "bottom"
if debug:
print(f"📍 Parsed location '{location}' -> {region}")
return region
def _apply_crop_region(self, image: Image.Image, region: Dict) -> Image.Image:
"""Apply crop region based on parsed location"""
width, height = image.size
# Determine crop box based on region
# We want roughly 1/2 to 2/3 of the image
crop_width = int(width * 0.6)
crop_height = int(height * 0.6)
# Horizontal position
if region["horizontal"] == "left":
left = 0
right = crop_width
elif region["horizontal"] == "right":
left = width - crop_width
right = width
else: # center
left = (width - crop_width) // 2
right = left + crop_width
# Vertical position
if region["vertical"] == "top":
top = 0
bottom = crop_height
elif region["vertical"] == "bottom":
top = height - crop_height
bottom = height
else: # middle
top = (height - crop_height) // 2
bottom = top + crop_height
return image.crop((left, top, right, bottom))
async def _intelligent_crop(
self,
image: Image.Image,
image_bytes: bytes,
target_size: int = 512,
debug: bool = False
) -> Optional[Image.Image]:
"""
Intelligently crop image to square, centering on detected face.
Args:
image: PIL Image
image_bytes: Image data as bytes (for API call)
target_size: Target size for square output
debug: Enable debug output
Returns:
Cropped PIL Image or None
"""
width, height = image.size
# Try face detection via API first
face_detection = await self._detect_face(image_bytes, debug=debug)
if face_detection and face_detection.get('center'):
if debug:
print(f"😊 Face detected at {face_detection['center']}")
crop_center = face_detection['center']
else:
if debug:
print("🎯 No face detected, using saliency detection")
# Fallback to saliency detection
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
crop_center = self._detect_saliency(cv_image, debug=debug)
# Determine crop box (square)
# Use 60% of the smaller dimension to include face context
base_size = min(width, height)
crop_size = int(base_size * 0.6)
# For very large images, cap the crop size at 1000px
if crop_size > 1000:
crop_size = 1000
# Minimum crop size for quality
if crop_size < 400:
crop_size = 400
# Center the crop on the detected point
left = crop_center[0] - crop_size // 2
top = crop_center[1] - crop_size // 2
# Adjust if crop goes out of bounds
# Instead of clamping, try to shift to keep face centered
if left < 0:
left = 0
elif left + crop_size > width:
left = width - crop_size
if top < 0:
# Face is too close to top edge
# Shift down but keep face in upper portion (top 40%)
top = 0
# Adjust crop_center for logging
if debug:
print(f"⚠️ Face too close to top edge, shifted crop to y=0")
elif top + crop_size > height:
# Face is too close to bottom edge
top = height - crop_size
if debug:
print(f"⚠️ Face too close to bottom edge, shifted crop to y={top}")
# Crop
cropped = image.crop((left, top, left + crop_size, top + crop_size))
# Resize to target size
cropped = cropped.resize((target_size, target_size), Image.Resampling.LANCZOS)
if debug:
print(f"✂️ Cropped to {target_size}x{target_size} centered at {crop_center}")
return cropped
async def _detect_face(self, image_bytes: bytes, debug: bool = False) -> Optional[Dict]:
"""
Detect anime face in image using external API and return detection data.
Args:
image_bytes: Image data as bytes
debug: Enable debug output
Returns:
Dict with detection data including bbox, confidence, keypoints, or None
"""
face_detector_started = False
try:
# Step 1: Ensure VRAM is available by unloading vision model
await self._ensure_vram_available(debug=debug)
# Step 2: Start face detector container
if not await self._start_face_detector(debug=debug):
if debug:
print("⚠️ Could not start face detector")
return None
face_detector_started = True
# Step 3: Call the face detection API
async with aiohttp.ClientSession() as session:
# Prepare multipart form data
form = aiohttp.FormData()
form.add_field('file', image_bytes, filename='image.jpg', content_type='image/jpeg')
async with session.post(
self.FACE_DETECTOR_API,
data=form,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
if debug:
print(f"⚠️ Face detection API returned status {response.status}")
return None
result = await response.json()
if result.get('count', 0) == 0:
if debug:
print("👤 No faces detected by API")
return None
# Get detections and pick the one with highest confidence
detections = result.get('detections', [])
if not detections:
return None
best_detection = max(detections, key=lambda d: d.get('confidence', 0))
# Extract bbox coordinates
bbox = best_detection.get('bbox', [])
confidence = best_detection.get('confidence', 0)
keypoints = best_detection.get('keypoints', [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = int((x1 + x2) / 2)
center_y = int((y1 + y2) / 2)
if debug:
width = int(x2 - x1)
height = int(y2 - y1)
print(f"👤 Detected {len(detections)} face(s) via API, using best at ({center_x}, {center_y}) [confidence: {confidence:.2%}]")
print(f" Bounding box: x={int(x1)}, y={int(y1)}, w={width}, h={height}")
print(f" Keypoints: {len(keypoints)} facial landmarks detected")
return {
'center': (center_x, center_y),
'bbox': bbox,
'confidence': confidence,
'keypoints': keypoints,
'count': len(detections)
}
except asyncio.TimeoutError:
if debug:
print("⚠️ Face detection API timeout")
except Exception as e:
if debug:
print(f"⚠️ Error calling face detection API: {e}")
finally:
# Always stop face detector to free VRAM
if face_detector_started:
await self._stop_face_detector(debug=debug)
return None
def _detect_saliency(self, cv_image: np.ndarray, debug: bool = False) -> Tuple[int, int]:
"""
Detect most salient (interesting) region of image.
Fallback when face detection fails.
Returns:
Tuple of (x, y) center coordinates
"""
try:
height, width = cv_image.shape[:2]
# Use OpenCV's saliency detector
saliency = cv2.saliency.StaticSaliencySpectralResidual_create()
success, saliency_map = saliency.computeSaliency(cv_image)
if success:
# Find the point with highest saliency
saliency_map = (saliency_map * 255).astype("uint8")
_, max_val, _, max_loc = cv2.minMaxLoc(saliency_map)
if debug:
print(f"🎯 Saliency peak at {max_loc}")
return max_loc
except Exception as e:
if debug:
print(f"⚠️ Saliency detection failed: {e}")
# Ultimate fallback: center of image
height, width = cv_image.shape[:2]
return (width // 2, height // 2)
def _extract_dominant_color(self, image: Image.Image, debug: bool = False) -> Optional[Tuple[int, int, int]]:
"""
Extract the dominant color from an image using k-means clustering.
Args:
image: PIL Image
debug: Enable debug output
Returns:
RGB tuple (r, g, b) or None
"""
try:
# Resize for faster processing
small_image = image.resize((150, 150))
# Convert to RGB if needed (handles grayscale, RGBA, etc.)
if small_image.mode != 'RGB':
small_image = small_image.convert('RGB')
# Convert to numpy array
pixels = np.array(small_image)
# Reshape to list of RGB pixels
pixels = pixels.reshape(-1, 3)
# Remove very dark (near black) and very bright (near white) pixels
# to avoid getting boring colors
mask = ~((pixels.sum(axis=1) < 30) | (pixels.sum(axis=1) > 720))
pixels = pixels[mask]
if len(pixels) == 0:
if debug:
print("⚠️ No valid pixels after filtering, using fallback")
return (200, 200, 200) # Neutral gray fallback
# Use k-means to find dominant colors
from sklearn.cluster import KMeans
n_colors = 5
kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=10)
kmeans.fit(pixels)
# Get cluster centers (dominant colors) and their frequencies
colors = kmeans.cluster_centers_
labels = kmeans.labels_
counts = np.bincount(labels)
if debug:
print(f"🎨 Found {n_colors} color clusters:")
for i, (color, count) in enumerate(zip(colors, counts)):
pct = (count / len(labels)) * 100
r, g, b = color.astype(int)
print(f" {i+1}. RGB({r}, {g}, {b}) = #{r:02x}{g:02x}{b:02x} ({pct:.1f}%)")
# Sort by frequency
sorted_indices = np.argsort(-counts)
# Pick the most vibrant/saturated color from top 3
best_color = None
best_saturation = 0
for idx in sorted_indices[:3]:
color = colors[idx].astype(int)
r, g, b = color
# Calculate saturation (how vibrant the color is)
max_c = max(r, g, b)
min_c = min(r, g, b)
saturation = (max_c - min_c) / max_c if max_c > 0 else 0
if debug:
print(f" Color RGB({r}, {g}, {b}) saturation: {saturation:.2f}")
# Prefer more saturated colors
if saturation > best_saturation:
best_saturation = saturation
# Convert to native Python ints for JSON serialization
best_color = (int(r), int(g), int(b))
if best_color:
if debug:
print(f"🎨 Selected color: RGB{best_color} (saturation: {best_saturation:.2f})")
return best_color
# Fallback to most common color
dominant_color = colors[sorted_indices[0]].astype(int)
# Convert to native Python ints
result = (int(dominant_color[0]), int(dominant_color[1]), int(dominant_color[2]))
if debug:
print(f"🎨 Using most common color: RGB{result}")
return result
except Exception as e:
if debug:
print(f"⚠️ Error extracting dominant color: {e}")
return None
async def _update_role_colors(self, color: Tuple[int, int, int], debug: bool = False):
"""
Update Miku's role color across all servers.
Args:
color: RGB tuple (r, g, b)
debug: Enable debug output
"""
if debug:
print(f"🎨 Starting role color update with RGB{color}")
if not globals.client:
if debug:
print("⚠️ No client available for role updates")
return
if debug:
print(f"🌐 Found {len(globals.client.guilds)} guild(s)")
# Convert RGB to Discord color (integer)
discord_color = discord.Color.from_rgb(*color)
updated_count = 0
failed_count = 0
for guild in globals.client.guilds:
try:
if debug:
print(f"🔍 Checking guild: {guild.name}")
# Find the bot's top role (usually colored role)
member = guild.get_member(globals.client.user.id)
if not member:
if debug:
print(f" ⚠️ Bot not found as member in {guild.name}")
continue
# Get the highest role that the bot has (excluding @everyone)
roles = [r for r in member.roles if r.name != "@everyone"]
if not roles:
if debug:
print(f" ⚠️ No roles found in {guild.name}")
continue
# Look for a dedicated color role first (e.g., "Miku Color")
color_role = None
for role in guild.roles:
if role.name.lower() in ["miku color", "miku colour", "miku-color"]:
color_role = role
break
# Get bot's top role
bot_top_role = max(roles, key=lambda r: r.position)
# Use dedicated color role if found, otherwise use top role
if color_role:
if debug:
print(f" 🎨 Found dedicated color role: {color_role.name} (position {color_role.position})")
target_role = color_role
else:
if debug:
print(f" 📝 No 'Miku Color' role found, using top role: {bot_top_role.name} (position {bot_top_role.position})")
target_role = bot_top_role
# Check permissions
can_manage = guild.me.guild_permissions.manage_roles
if debug:
print(f" 🔑 Manage roles permission: {can_manage}")
print(f" 📊 Bot top role: {bot_top_role.name} (pos {bot_top_role.position}), Target: {target_role.name} (pos {target_role.position})")
# Only update if we have permission and it's not a special role
if can_manage:
# Run role edit in bot's event loop
if globals.client.loop and globals.client.loop.is_running():
future = asyncio.run_coroutine_threadsafe(
target_role.edit(color=discord_color, reason="Profile picture color sync"),
globals.client.loop
)
future.result(timeout=5)
else:
await target_role.edit(color=discord_color, reason="Profile picture color sync")
updated_count += 1
if debug:
print(f" ✅ Updated role color in {guild.name}: {target_role.name}")
else:
if debug:
print(f" ⚠️ No manage_roles permission in {guild.name}")
except discord.Forbidden:
failed_count += 1
if debug:
print(f" ❌ Forbidden: No permission to update role in {guild.name}")
except Exception as e:
failed_count += 1
if debug:
print(f" ❌ Error updating role in {guild.name}: {e}")
import traceback
traceback.print_exc()
if updated_count > 0:
print(f"🎨 Updated role colors in {updated_count} server(s)")
else:
print(f"⚠️ No roles were updated (failed: {failed_count})")
if failed_count > 0 and debug:
print(f"⚠️ Failed to update {failed_count} server(s)")
async def set_custom_role_color(self, hex_color: str, debug: bool = False) -> Dict:
"""
Set a custom role color across all servers.
Args:
hex_color: Hex color code (e.g., "#86cecb" or "86cecb")
debug: Enable debug output
Returns:
Dict with success status and count
"""
# Parse hex color
hex_color = hex_color.strip().lstrip('#')
try:
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
color = (r, g, b)
except (ValueError, IndexError):
return {
"success": False,
"error": f"Invalid hex color: {hex_color}"
}
if debug:
print(f"🎨 Setting custom role color: #{hex_color} RGB{color}")
await self._update_role_colors(color, debug=debug)
return {
"success": True,
"color": {
"hex": f"#{hex_color}",
"rgb": color
}
}
async def reset_to_fallback_color(self, debug: bool = False) -> Dict:
"""
Reset role color to the fallback color (#86cecb).
Args:
debug: Enable debug output
Returns:
Dict with success status
"""
if debug:
print(f"🎨 Resetting to fallback color: RGB{self.FALLBACK_ROLE_COLOR}")
await self._update_role_colors(self.FALLBACK_ROLE_COLOR, debug=debug)
return {
"success": True,
"color": {
"hex": "#86cecb",
"rgb": self.FALLBACK_ROLE_COLOR
}
}
def _save_metadata(self, metadata: Dict):
"""Save metadata about current profile picture"""
try:
with open(self.METADATA_PATH, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving metadata: {e}")
def load_metadata(self) -> Optional[Dict]:
"""Load metadata about current profile picture"""
try:
if os.path.exists(self.METADATA_PATH):
with open(self.METADATA_PATH, 'r') as f:
return json.load(f)
except Exception as e:
print(f"⚠️ Error loading metadata: {e}")
return None
async def restore_fallback(self) -> bool:
"""Restore the fallback profile picture"""
try:
if not os.path.exists(self.FALLBACK_PATH):
print("⚠️ No fallback avatar found")
return False
with open(self.FALLBACK_PATH, 'rb') as f:
avatar_bytes = f.read()
if globals.client and globals.client.user:
# Run the edit operation in the bot's event loop
if globals.client.loop and globals.client.loop.is_running():
future = asyncio.run_coroutine_threadsafe(
globals.client.user.edit(avatar=avatar_bytes),
globals.client.loop
)
future.result(timeout=10)
else:
await globals.client.user.edit(avatar=avatar_bytes)
print("✅ Restored fallback avatar")
return True
except Exception as e:
print(f"⚠️ Error restoring fallback: {e}")
return False
def get_current_description(self) -> Optional[str]:
"""
Get the description of the current profile picture.
Returns:
Description string or None
"""
description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
try:
if os.path.exists(description_path):
with open(description_path, 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
print(f"⚠️ Error reading description: {e}")
return None
# Global instance
profile_picture_manager = ProfilePictureManager()