1217 lines
48 KiB
Python
1217 lines
48 KiB
Python
# profile_picture_manager.py
|
|
"""
|
|
Intelligent profile picture manager for Miku.
|
|
Handles searching, face detection, cropping, and Discord avatar updates.
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import aiohttp
|
|
import asyncio
|
|
from PIL import Image, ImageDraw
|
|
import numpy as np
|
|
import cv2
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Tuple, List
|
|
import json
|
|
import discord
|
|
import globals
|
|
|
|
from .danbooru_client import danbooru_client
|
|
import globals
|
|
|
|
|
|
class ProfilePictureManager:
|
|
"""Manages Miku's profile picture with intelligent cropping and face detection"""
|
|
|
|
PROFILE_PIC_DIR = "memory/profile_pictures"
|
|
FALLBACK_PATH = "memory/profile_pictures/fallback.png"
|
|
CURRENT_PATH = "memory/profile_pictures/current.png"
|
|
METADATA_PATH = "memory/profile_pictures/metadata.json"
|
|
|
|
# Face detection API endpoint
|
|
FACE_DETECTOR_API = "http://anime-face-detector:6078/detect"
|
|
|
|
# Fallback role color (Miku's iconic teal)
|
|
FALLBACK_ROLE_COLOR = (134, 206, 203) # #86cecb
|
|
|
|
def __init__(self):
|
|
self._ensure_directories()
|
|
|
|
def _ensure_directories(self):
|
|
"""Ensure profile picture directory exists"""
|
|
os.makedirs(self.PROFILE_PIC_DIR, exist_ok=True)
|
|
|
|
async def initialize(self):
|
|
"""Initialize the profile picture manager (check API availability)"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get("http://anime-face-detector:6078/health", timeout=aiohttp.ClientTimeout(total=5)) as response:
|
|
if response.status == 200:
|
|
print("✅ Anime face detector API connected")
|
|
return True
|
|
except Exception as e:
|
|
print(f"⚠️ Face detector API not available: {e}")
|
|
print(" Profile picture changes will use fallback cropping")
|
|
return False
|
|
|
|
async def _ensure_vram_available(self, debug: bool = False):
|
|
"""
|
|
Ensure VRAM is available for face detection by swapping to text model.
|
|
This unloads the vision model if it's loaded.
|
|
"""
|
|
try:
|
|
if debug:
|
|
print("💾 Swapping to text model to free VRAM for face detection...")
|
|
|
|
# Make a simple request to text model to trigger swap
|
|
async with aiohttp.ClientSession() as session:
|
|
payload = {
|
|
"model": "llama3.1",
|
|
"messages": [{"role": "user", "content": "hi"}],
|
|
"max_tokens": 1,
|
|
"stream": False
|
|
}
|
|
|
|
async with session.post(
|
|
"http://llama-swap:8080/v1/chat/completions",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=60)
|
|
) as response:
|
|
if response.status == 200:
|
|
if debug:
|
|
print("✅ Vision model unloaded, VRAM available")
|
|
# Give system time to fully release VRAM
|
|
await asyncio.sleep(3)
|
|
return True
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Could not swap models: {e}")
|
|
|
|
return False
|
|
|
|
async def _start_face_detector(self, debug: bool = False) -> bool:
|
|
"""Start the face detector container using Docker socket API"""
|
|
try:
|
|
if debug:
|
|
print("🚀 Starting face detector container...")
|
|
|
|
# Use Docker socket API to start container
|
|
import aiofiles
|
|
import json as json_lib
|
|
|
|
# Docker socket path
|
|
socket_path = "/var/run/docker.sock"
|
|
|
|
# Check if socket exists
|
|
if not os.path.exists(socket_path):
|
|
if debug:
|
|
print("⚠️ Docker socket not available")
|
|
return False
|
|
|
|
# Use aiohttp UnixConnector to communicate with Docker socket
|
|
from aiohttp import UnixConnector
|
|
|
|
async with aiohttp.ClientSession(
|
|
connector=UnixConnector(path=socket_path)
|
|
) as session:
|
|
# Start the container
|
|
url = "http://localhost/containers/anime-face-detector/start"
|
|
async with session.post(url) as response:
|
|
if response.status not in [204, 304]: # 204=started, 304=already running
|
|
if debug:
|
|
error_text = await response.text()
|
|
print(f"⚠️ Failed to start container: {response.status} - {error_text}")
|
|
return False
|
|
|
|
# Wait for API to be ready
|
|
for i in range(30): # 30 second timeout
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
"http://anime-face-detector:6078/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as response:
|
|
if response.status == 200:
|
|
if debug:
|
|
print(f"✅ Face detector ready (took {i+1}s)")
|
|
return True
|
|
except:
|
|
pass
|
|
await asyncio.sleep(1)
|
|
|
|
if debug:
|
|
print("⚠️ Face detector didn't become ready in time")
|
|
return False
|
|
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Error starting face detector: {e}")
|
|
return False
|
|
|
|
async def _stop_face_detector(self, debug: bool = False):
|
|
"""Stop the face detector container using Docker socket API"""
|
|
try:
|
|
if debug:
|
|
print("🛑 Stopping face detector to free VRAM...")
|
|
|
|
socket_path = "/var/run/docker.sock"
|
|
|
|
if not os.path.exists(socket_path):
|
|
if debug:
|
|
print("⚠️ Docker socket not available")
|
|
return
|
|
|
|
from aiohttp import UnixConnector
|
|
|
|
async with aiohttp.ClientSession(
|
|
connector=UnixConnector(path=socket_path)
|
|
) as session:
|
|
# Stop the container
|
|
url = "http://localhost/containers/anime-face-detector/stop"
|
|
async with session.post(url, params={"t": 10}) as response: # 10 second timeout
|
|
if response.status in [204, 304]: # 204=stopped, 304=already stopped
|
|
if debug:
|
|
print("✅ Face detector stopped")
|
|
else:
|
|
if debug:
|
|
error_text = await response.text()
|
|
print(f"⚠️ Failed to stop container: {response.status} - {error_text}")
|
|
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Error stopping face detector: {e}")
|
|
|
|
async def save_current_avatar_as_fallback(self):
|
|
"""Save the bot's current avatar as fallback (only if fallback doesn't exist)"""
|
|
try:
|
|
# Only save if fallback doesn't already exist
|
|
if os.path.exists(self.FALLBACK_PATH):
|
|
print("✅ Fallback avatar already exists, skipping save")
|
|
return True
|
|
|
|
if not globals.client or not globals.client.user:
|
|
print("⚠️ Bot client not ready")
|
|
return False
|
|
|
|
avatar_asset = globals.client.user.avatar or globals.client.user.default_avatar
|
|
|
|
# Download avatar
|
|
avatar_bytes = await avatar_asset.read()
|
|
|
|
# Save as fallback
|
|
with open(self.FALLBACK_PATH, 'wb') as f:
|
|
f.write(avatar_bytes)
|
|
|
|
print(f"✅ Saved current avatar as fallback ({len(avatar_bytes)} bytes)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error saving fallback avatar: {e}")
|
|
return False
|
|
|
|
async def change_profile_picture(
|
|
self,
|
|
mood: Optional[str] = None,
|
|
custom_image_bytes: Optional[bytes] = None,
|
|
debug: bool = False,
|
|
max_retries: int = 5
|
|
) -> Dict:
|
|
"""
|
|
Main function to change Miku's profile picture.
|
|
|
|
Args:
|
|
mood: Current mood to influence Danbooru search
|
|
custom_image_bytes: If provided, use this image instead of Danbooru
|
|
debug: Enable debug output
|
|
max_retries: Maximum number of attempts to find a valid Miku image (for Danbooru)
|
|
|
|
Returns:
|
|
Dict with status and metadata
|
|
"""
|
|
result = {
|
|
"success": False,
|
|
"source": None,
|
|
"error": None,
|
|
"metadata": {}
|
|
}
|
|
|
|
try:
|
|
# Step 1: Get and validate image (with retry for Danbooru)
|
|
image_bytes = None
|
|
image = None
|
|
|
|
if custom_image_bytes:
|
|
# Custom upload - no retry needed
|
|
if debug:
|
|
print("🖼️ Using provided custom image")
|
|
image_bytes = custom_image_bytes
|
|
result["source"] = "custom_upload"
|
|
|
|
# Load image with PIL
|
|
try:
|
|
image = Image.open(io.BytesIO(image_bytes))
|
|
if debug:
|
|
print(f"📐 Original image size: {image.size}")
|
|
except Exception as e:
|
|
result["error"] = f"Failed to open image: {e}"
|
|
return result
|
|
|
|
else:
|
|
# Danbooru - retry until we find a valid Miku image
|
|
if debug:
|
|
print(f"🎨 Searching Danbooru for Miku image (mood: {mood})")
|
|
|
|
for attempt in range(max_retries):
|
|
if attempt > 0 and debug:
|
|
print(f"🔄 Retry attempt {attempt + 1}/{max_retries}")
|
|
|
|
post = await danbooru_client.get_random_miku_image(mood=mood)
|
|
if not post:
|
|
continue
|
|
|
|
image_url = danbooru_client.extract_image_url(post)
|
|
if not image_url:
|
|
continue
|
|
|
|
# Download image
|
|
temp_image_bytes = await self._download_image(image_url)
|
|
if not temp_image_bytes:
|
|
continue
|
|
|
|
if debug:
|
|
print(f"✅ Downloaded image from Danbooru (post #{danbooru_client.get_post_metadata(post).get('id')})")
|
|
|
|
# Load image with PIL
|
|
try:
|
|
temp_image = Image.open(io.BytesIO(temp_image_bytes))
|
|
if debug:
|
|
print(f"📐 Original image size: {temp_image.size}")
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Failed to open image: {e}")
|
|
continue
|
|
|
|
# Verify it's Miku
|
|
miku_verification = await self._verify_and_locate_miku(temp_image_bytes, debug=debug)
|
|
if not miku_verification["is_miku"]:
|
|
if debug:
|
|
print(f"❌ Image verification failed: not Miku, trying another...")
|
|
continue
|
|
|
|
# Success! This image is valid
|
|
image_bytes = temp_image_bytes
|
|
image = temp_image
|
|
result["source"] = "danbooru"
|
|
result["metadata"] = danbooru_client.get_post_metadata(post)
|
|
|
|
# If multiple characters detected, use LLM's suggested crop region
|
|
if miku_verification.get("crop_region"):
|
|
if debug:
|
|
print(f"🎯 Using LLM-suggested crop region for Miku")
|
|
image = self._apply_crop_region(image, miku_verification["crop_region"])
|
|
|
|
break
|
|
|
|
# Check if we found a valid image
|
|
if not image_bytes or not image:
|
|
result["error"] = f"Could not find valid Miku image after {max_retries} attempts"
|
|
return result
|
|
|
|
# Step 2: Generate description of the validated image
|
|
if debug:
|
|
print("📝 Generating image description...")
|
|
description = await self._generate_image_description(image_bytes, debug=debug)
|
|
if description:
|
|
# Save description to file
|
|
description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
|
|
try:
|
|
with open(description_path, 'w', encoding='utf-8') as f:
|
|
f.write(description)
|
|
result["metadata"]["description"] = description
|
|
if debug:
|
|
print(f"📝 Saved image description ({len(description)} chars)")
|
|
except Exception as e:
|
|
print(f"⚠️ Failed to save description file: {e}")
|
|
else:
|
|
if debug:
|
|
print("⚠️ Description generation returned None")
|
|
|
|
# Step 3: Detect face and crop intelligently
|
|
cropped_image = await self._intelligent_crop(image, image_bytes, target_size=512, debug=debug)
|
|
|
|
if not cropped_image:
|
|
result["error"] = "Failed to crop image"
|
|
return result
|
|
|
|
# Step 4: Save the cropped image first
|
|
output_buffer = io.BytesIO()
|
|
cropped_image.save(output_buffer, format='PNG')
|
|
cropped_bytes = output_buffer.getvalue()
|
|
|
|
# Save to disk as current
|
|
with open(self.CURRENT_PATH, 'wb') as f:
|
|
f.write(cropped_bytes)
|
|
|
|
if debug:
|
|
print(f"💾 Saved cropped image ({len(cropped_bytes)} bytes)")
|
|
|
|
# Step 5: Extract dominant color from saved current.png
|
|
saved_image = Image.open(self.CURRENT_PATH)
|
|
dominant_color = self._extract_dominant_color(saved_image, debug=debug)
|
|
if dominant_color:
|
|
result["metadata"]["dominant_color"] = {
|
|
"rgb": dominant_color,
|
|
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
|
|
}
|
|
if debug:
|
|
print(f"🎨 Dominant color: RGB{dominant_color} (#{result['metadata']['dominant_color']['hex'][1:]})")
|
|
|
|
# Step 6: Update Discord avatar
|
|
if globals.client and globals.client.user:
|
|
try:
|
|
# Run the edit operation in the bot's event loop
|
|
if globals.client.loop and globals.client.loop.is_running():
|
|
# Create a future to run in bot's loop
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
globals.client.user.edit(avatar=cropped_bytes),
|
|
globals.client.loop
|
|
)
|
|
# Wait for the result
|
|
future.result(timeout=10)
|
|
else:
|
|
# Fallback if loop not available (shouldn't happen)
|
|
await globals.client.user.edit(avatar=cropped_bytes)
|
|
|
|
result["success"] = True
|
|
result["metadata"]["changed_at"] = datetime.now().isoformat()
|
|
|
|
# Save metadata
|
|
self._save_metadata(result["metadata"])
|
|
|
|
print(f"✅ Profile picture updated successfully!")
|
|
|
|
# Step 7: Update role colors across all servers
|
|
if dominant_color:
|
|
await self._update_role_colors(dominant_color, debug=debug)
|
|
|
|
except discord.HTTPException as e:
|
|
result["error"] = f"Discord API error: {e}"
|
|
print(f"⚠️ Failed to update Discord avatar: {e}")
|
|
except Exception as e:
|
|
result["error"] = f"Unexpected error updating avatar: {e}"
|
|
print(f"⚠️ Unexpected error: {e}")
|
|
else:
|
|
result["error"] = "Bot client not ready"
|
|
|
|
except Exception as e:
|
|
result["error"] = f"Unexpected error: {e}"
|
|
print(f"⚠️ Error in change_profile_picture: {e}")
|
|
|
|
return result
|
|
|
|
async def _download_image(self, url: str) -> Optional[bytes]:
|
|
"""Download image from URL"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, timeout=15) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
except Exception as e:
|
|
print(f"⚠️ Error downloading image: {e}")
|
|
return None
|
|
|
|
async def _generate_image_description(self, image_bytes: bytes, debug: bool = False) -> Optional[str]:
|
|
"""
|
|
Generate a detailed description of the profile picture using vision model.
|
|
This description will be used when users ask about the pfp.
|
|
|
|
Args:
|
|
image_bytes: Raw image bytes
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
Description string or None
|
|
"""
|
|
try:
|
|
import base64
|
|
image_b64 = base64.b64encode(image_bytes).decode('utf-8')
|
|
|
|
if debug:
|
|
print(f"📸 Encoded image: {len(image_b64)} chars, calling vision model...")
|
|
|
|
prompt = """This is an image of Hatsune Miku that will be used as a profile picture.
|
|
Please describe this image in detail, including:
|
|
- What Miku is wearing (outfit, colors, accessories)
|
|
- Her pose and expression
|
|
- The art style and mood of the image
|
|
- Any notable background elements
|
|
- Overall atmosphere or theme
|
|
|
|
Keep the description conversational and in second-person (referring to Miku as "you"), as if Miku herself is describing her own appearance in this image."""
|
|
|
|
payload = {
|
|
"model": globals.VISION_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": prompt
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{image_b64}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"stream": False,
|
|
"max_tokens": 400,
|
|
"temperature": 0.7
|
|
}
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
if debug:
|
|
print(f"🌐 Calling {globals.LLAMA_URL}/v1/chat/completions with model {globals.VISION_MODEL}")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
|
|
if debug:
|
|
print(f"📦 API Response keys: {data.keys()}")
|
|
print(f"📦 Choices: {data.get('choices', [])}")
|
|
|
|
# Try to get content from the response
|
|
choice = data.get("choices", [{}])[0]
|
|
message = choice.get("message", {})
|
|
|
|
# Check both 'content' and 'reasoning_content' fields
|
|
description = message.get("content", "")
|
|
|
|
# If content is empty, try reasoning_content (chain-of-thought models)
|
|
if not description or not description.strip():
|
|
description = message.get("reasoning_content", "")
|
|
|
|
if description and description.strip():
|
|
if debug:
|
|
print(f"✅ Generated description: {description[:100]}...")
|
|
return description.strip()
|
|
else:
|
|
if debug:
|
|
print(f"⚠️ Description is empty or None")
|
|
print(f" Full response: {data}")
|
|
else:
|
|
print(f"⚠️ Description is empty or None")
|
|
return None
|
|
else:
|
|
error_text = await resp.text()
|
|
print(f"❌ Vision API error generating description: {resp.status} - {error_text}")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error generating image description: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
return None
|
|
|
|
async def _verify_and_locate_miku(self, image_bytes: bytes, debug: bool = False) -> Dict:
|
|
"""
|
|
Use vision model to verify image contains Miku and locate her if multiple characters.
|
|
|
|
Returns:
|
|
Dict with is_miku bool and optional crop_region
|
|
"""
|
|
result = {
|
|
"is_miku": False,
|
|
"crop_region": None,
|
|
"character_count": 0
|
|
}
|
|
|
|
try:
|
|
# Encode image to base64
|
|
import base64
|
|
image_b64 = base64.b64encode(image_bytes).decode('utf-8')
|
|
|
|
# Query vision model using OpenAI-compatible API
|
|
prompt = """Analyze this image and answer:
|
|
1. Is Hatsune Miku present in this image? (yes/no)
|
|
2. How many characters are in the image? (number)
|
|
3. If there are multiple characters, describe where Miku is located (left/right/center, top/bottom/middle)
|
|
|
|
Respond in JSON format:
|
|
{
|
|
"is_miku": true/false,
|
|
"character_count": number,
|
|
"miku_location": "description or null"
|
|
}"""
|
|
|
|
payload = {
|
|
"model": globals.VISION_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": prompt
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{image_b64}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"stream": False,
|
|
"max_tokens": 200,
|
|
"temperature": 0.3
|
|
}
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
response = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
else:
|
|
error_text = await resp.text()
|
|
print(f"❌ Vision API error: {resp.status} - {error_text}")
|
|
return result
|
|
|
|
if debug:
|
|
print(f"🤖 Vision model response: {response}")
|
|
|
|
# Parse JSON response
|
|
import re
|
|
json_match = re.search(r'\{[^}]+\}', response)
|
|
if json_match:
|
|
data = json.loads(json_match.group())
|
|
result["is_miku"] = data.get("is_miku", False)
|
|
result["character_count"] = data.get("character_count", 1)
|
|
|
|
# If multiple characters, parse location
|
|
if result["character_count"] > 1 and data.get("miku_location"):
|
|
result["crop_region"] = self._parse_location_to_region(
|
|
data["miku_location"],
|
|
debug=debug
|
|
)
|
|
else:
|
|
# Fallback: simple text analysis
|
|
response_lower = response.lower()
|
|
result["is_miku"] = "yes" in response_lower or "miku" in response_lower
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error in vision verification: {e}")
|
|
# Assume it's Miku on error (trust Danbooru tags)
|
|
result["is_miku"] = True
|
|
|
|
return result
|
|
|
|
def _parse_location_to_region(self, location: str, debug: bool = False) -> Optional[Dict]:
|
|
"""Parse location description to crop region coordinates"""
|
|
location_lower = location.lower()
|
|
|
|
# Simple region detection
|
|
region = {
|
|
"horizontal": "center", # left, center, right
|
|
"vertical": "middle" # top, middle, bottom
|
|
}
|
|
|
|
if "left" in location_lower:
|
|
region["horizontal"] = "left"
|
|
elif "right" in location_lower:
|
|
region["horizontal"] = "right"
|
|
|
|
if "top" in location_lower:
|
|
region["vertical"] = "top"
|
|
elif "bottom" in location_lower:
|
|
region["vertical"] = "bottom"
|
|
|
|
if debug:
|
|
print(f"📍 Parsed location '{location}' -> {region}")
|
|
|
|
return region
|
|
|
|
def _apply_crop_region(self, image: Image.Image, region: Dict) -> Image.Image:
|
|
"""Apply crop region based on parsed location"""
|
|
width, height = image.size
|
|
|
|
# Determine crop box based on region
|
|
# We want roughly 1/2 to 2/3 of the image
|
|
crop_width = int(width * 0.6)
|
|
crop_height = int(height * 0.6)
|
|
|
|
# Horizontal position
|
|
if region["horizontal"] == "left":
|
|
left = 0
|
|
right = crop_width
|
|
elif region["horizontal"] == "right":
|
|
left = width - crop_width
|
|
right = width
|
|
else: # center
|
|
left = (width - crop_width) // 2
|
|
right = left + crop_width
|
|
|
|
# Vertical position
|
|
if region["vertical"] == "top":
|
|
top = 0
|
|
bottom = crop_height
|
|
elif region["vertical"] == "bottom":
|
|
top = height - crop_height
|
|
bottom = height
|
|
else: # middle
|
|
top = (height - crop_height) // 2
|
|
bottom = top + crop_height
|
|
|
|
return image.crop((left, top, right, bottom))
|
|
|
|
async def _intelligent_crop(
|
|
self,
|
|
image: Image.Image,
|
|
image_bytes: bytes,
|
|
target_size: int = 512,
|
|
debug: bool = False
|
|
) -> Optional[Image.Image]:
|
|
"""
|
|
Intelligently crop image to square, centering on detected face.
|
|
|
|
Args:
|
|
image: PIL Image
|
|
image_bytes: Image data as bytes (for API call)
|
|
target_size: Target size for square output
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
Cropped PIL Image or None
|
|
"""
|
|
width, height = image.size
|
|
|
|
# Try face detection via API first
|
|
face_detection = await self._detect_face(image_bytes, debug=debug)
|
|
|
|
if face_detection and face_detection.get('center'):
|
|
if debug:
|
|
print(f"😊 Face detected at {face_detection['center']}")
|
|
crop_center = face_detection['center']
|
|
else:
|
|
if debug:
|
|
print("🎯 No face detected, using saliency detection")
|
|
# Fallback to saliency detection
|
|
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
|
crop_center = self._detect_saliency(cv_image, debug=debug)
|
|
|
|
# Determine crop box (square)
|
|
# Use 60% of the smaller dimension to include face context
|
|
base_size = min(width, height)
|
|
crop_size = int(base_size * 0.6)
|
|
|
|
# For very large images, cap the crop size at 1000px
|
|
if crop_size > 1000:
|
|
crop_size = 1000
|
|
|
|
# Minimum crop size for quality
|
|
if crop_size < 400:
|
|
crop_size = 400
|
|
|
|
# Center the crop on the detected point
|
|
left = crop_center[0] - crop_size // 2
|
|
top = crop_center[1] - crop_size // 2
|
|
|
|
# Adjust if crop goes out of bounds
|
|
# Instead of clamping, try to shift to keep face centered
|
|
if left < 0:
|
|
left = 0
|
|
elif left + crop_size > width:
|
|
left = width - crop_size
|
|
|
|
if top < 0:
|
|
# Face is too close to top edge
|
|
# Shift down but keep face in upper portion (top 40%)
|
|
top = 0
|
|
# Adjust crop_center for logging
|
|
if debug:
|
|
print(f"⚠️ Face too close to top edge, shifted crop to y=0")
|
|
elif top + crop_size > height:
|
|
# Face is too close to bottom edge
|
|
top = height - crop_size
|
|
if debug:
|
|
print(f"⚠️ Face too close to bottom edge, shifted crop to y={top}")
|
|
|
|
# Crop
|
|
cropped = image.crop((left, top, left + crop_size, top + crop_size))
|
|
|
|
# Resize to target size
|
|
cropped = cropped.resize((target_size, target_size), Image.Resampling.LANCZOS)
|
|
|
|
if debug:
|
|
print(f"✂️ Cropped to {target_size}x{target_size} centered at {crop_center}")
|
|
|
|
return cropped
|
|
|
|
async def _detect_face(self, image_bytes: bytes, debug: bool = False) -> Optional[Dict]:
|
|
"""
|
|
Detect anime face in image using external API and return detection data.
|
|
|
|
Args:
|
|
image_bytes: Image data as bytes
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
Dict with detection data including bbox, confidence, keypoints, or None
|
|
"""
|
|
face_detector_started = False
|
|
|
|
try:
|
|
# Step 1: Ensure VRAM is available by unloading vision model
|
|
await self._ensure_vram_available(debug=debug)
|
|
|
|
# Step 2: Start face detector container
|
|
if not await self._start_face_detector(debug=debug):
|
|
if debug:
|
|
print("⚠️ Could not start face detector")
|
|
return None
|
|
|
|
face_detector_started = True
|
|
|
|
# Step 3: Call the face detection API
|
|
async with aiohttp.ClientSession() as session:
|
|
# Prepare multipart form data
|
|
form = aiohttp.FormData()
|
|
form.add_field('file', image_bytes, filename='image.jpg', content_type='image/jpeg')
|
|
|
|
async with session.post(
|
|
self.FACE_DETECTOR_API,
|
|
data=form,
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as response:
|
|
if response.status != 200:
|
|
if debug:
|
|
print(f"⚠️ Face detection API returned status {response.status}")
|
|
return None
|
|
|
|
result = await response.json()
|
|
|
|
if result.get('count', 0) == 0:
|
|
if debug:
|
|
print("👤 No faces detected by API")
|
|
return None
|
|
|
|
# Get detections and pick the one with highest confidence
|
|
detections = result.get('detections', [])
|
|
if not detections:
|
|
return None
|
|
|
|
best_detection = max(detections, key=lambda d: d.get('confidence', 0))
|
|
|
|
# Extract bbox coordinates
|
|
bbox = best_detection.get('bbox', [])
|
|
confidence = best_detection.get('confidence', 0)
|
|
keypoints = best_detection.get('keypoints', [])
|
|
|
|
if len(bbox) >= 4:
|
|
x1, y1, x2, y2 = bbox[:4]
|
|
center_x = int((x1 + x2) / 2)
|
|
center_y = int((y1 + y2) / 2)
|
|
|
|
if debug:
|
|
width = int(x2 - x1)
|
|
height = int(y2 - y1)
|
|
print(f"👤 Detected {len(detections)} face(s) via API, using best at ({center_x}, {center_y}) [confidence: {confidence:.2%}]")
|
|
print(f" Bounding box: x={int(x1)}, y={int(y1)}, w={width}, h={height}")
|
|
print(f" Keypoints: {len(keypoints)} facial landmarks detected")
|
|
|
|
return {
|
|
'center': (center_x, center_y),
|
|
'bbox': bbox,
|
|
'confidence': confidence,
|
|
'keypoints': keypoints,
|
|
'count': len(detections)
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
if debug:
|
|
print("⚠️ Face detection API timeout")
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Error calling face detection API: {e}")
|
|
finally:
|
|
# Always stop face detector to free VRAM
|
|
if face_detector_started:
|
|
await self._stop_face_detector(debug=debug)
|
|
|
|
return None
|
|
|
|
def _detect_saliency(self, cv_image: np.ndarray, debug: bool = False) -> Tuple[int, int]:
|
|
"""
|
|
Detect most salient (interesting) region of image.
|
|
Fallback when face detection fails.
|
|
|
|
Returns:
|
|
Tuple of (x, y) center coordinates
|
|
"""
|
|
try:
|
|
height, width = cv_image.shape[:2]
|
|
|
|
# Use OpenCV's saliency detector
|
|
saliency = cv2.saliency.StaticSaliencySpectralResidual_create()
|
|
success, saliency_map = saliency.computeSaliency(cv_image)
|
|
|
|
if success:
|
|
# Find the point with highest saliency
|
|
saliency_map = (saliency_map * 255).astype("uint8")
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(saliency_map)
|
|
|
|
if debug:
|
|
print(f"🎯 Saliency peak at {max_loc}")
|
|
|
|
return max_loc
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Saliency detection failed: {e}")
|
|
|
|
# Ultimate fallback: center of image
|
|
height, width = cv_image.shape[:2]
|
|
return (width // 2, height // 2)
|
|
|
|
def _extract_dominant_color(self, image: Image.Image, debug: bool = False) -> Optional[Tuple[int, int, int]]:
|
|
"""
|
|
Extract the dominant color from an image using k-means clustering.
|
|
|
|
Args:
|
|
image: PIL Image
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
RGB tuple (r, g, b) or None
|
|
"""
|
|
try:
|
|
# Resize for faster processing
|
|
small_image = image.resize((150, 150))
|
|
|
|
# Convert to RGB if needed (handles grayscale, RGBA, etc.)
|
|
if small_image.mode != 'RGB':
|
|
small_image = small_image.convert('RGB')
|
|
|
|
# Convert to numpy array
|
|
pixels = np.array(small_image)
|
|
|
|
# Reshape to list of RGB pixels
|
|
pixels = pixels.reshape(-1, 3)
|
|
|
|
# Remove very dark (near black) and very bright (near white) pixels
|
|
# to avoid getting boring colors
|
|
mask = ~((pixels.sum(axis=1) < 30) | (pixels.sum(axis=1) > 720))
|
|
pixels = pixels[mask]
|
|
|
|
if len(pixels) == 0:
|
|
if debug:
|
|
print("⚠️ No valid pixels after filtering, using fallback")
|
|
return (200, 200, 200) # Neutral gray fallback
|
|
|
|
# Use k-means to find dominant colors
|
|
from sklearn.cluster import KMeans
|
|
n_colors = 5
|
|
kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=10)
|
|
kmeans.fit(pixels)
|
|
|
|
# Get cluster centers (dominant colors) and their frequencies
|
|
colors = kmeans.cluster_centers_
|
|
labels = kmeans.labels_
|
|
counts = np.bincount(labels)
|
|
|
|
if debug:
|
|
print(f"🎨 Found {n_colors} color clusters:")
|
|
for i, (color, count) in enumerate(zip(colors, counts)):
|
|
pct = (count / len(labels)) * 100
|
|
r, g, b = color.astype(int)
|
|
print(f" {i+1}. RGB({r}, {g}, {b}) = #{r:02x}{g:02x}{b:02x} ({pct:.1f}%)")
|
|
|
|
# Sort by frequency
|
|
sorted_indices = np.argsort(-counts)
|
|
|
|
# Pick the most vibrant/saturated color from top 3
|
|
best_color = None
|
|
best_saturation = 0
|
|
|
|
for idx in sorted_indices[:3]:
|
|
color = colors[idx].astype(int)
|
|
r, g, b = color
|
|
|
|
# Calculate saturation (how vibrant the color is)
|
|
max_c = max(r, g, b)
|
|
min_c = min(r, g, b)
|
|
saturation = (max_c - min_c) / max_c if max_c > 0 else 0
|
|
|
|
if debug:
|
|
print(f" Color RGB({r}, {g}, {b}) saturation: {saturation:.2f}")
|
|
|
|
# Prefer more saturated colors
|
|
if saturation > best_saturation:
|
|
best_saturation = saturation
|
|
# Convert to native Python ints for JSON serialization
|
|
best_color = (int(r), int(g), int(b))
|
|
|
|
if best_color:
|
|
if debug:
|
|
print(f"🎨 Selected color: RGB{best_color} (saturation: {best_saturation:.2f})")
|
|
return best_color
|
|
|
|
# Fallback to most common color
|
|
dominant_color = colors[sorted_indices[0]].astype(int)
|
|
# Convert to native Python ints
|
|
result = (int(dominant_color[0]), int(dominant_color[1]), int(dominant_color[2]))
|
|
if debug:
|
|
print(f"🎨 Using most common color: RGB{result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"⚠️ Error extracting dominant color: {e}")
|
|
return None
|
|
|
|
async def _update_role_colors(self, color: Tuple[int, int, int], debug: bool = False):
|
|
"""
|
|
Update Miku's role color across all servers.
|
|
|
|
Args:
|
|
color: RGB tuple (r, g, b)
|
|
debug: Enable debug output
|
|
"""
|
|
if debug:
|
|
print(f"🎨 Starting role color update with RGB{color}")
|
|
|
|
if not globals.client:
|
|
if debug:
|
|
print("⚠️ No client available for role updates")
|
|
return
|
|
|
|
if debug:
|
|
print(f"🌐 Found {len(globals.client.guilds)} guild(s)")
|
|
|
|
# Convert RGB to Discord color (integer)
|
|
discord_color = discord.Color.from_rgb(*color)
|
|
|
|
updated_count = 0
|
|
failed_count = 0
|
|
|
|
for guild in globals.client.guilds:
|
|
try:
|
|
if debug:
|
|
print(f"🔍 Checking guild: {guild.name}")
|
|
|
|
# Find the bot's top role (usually colored role)
|
|
member = guild.get_member(globals.client.user.id)
|
|
if not member:
|
|
if debug:
|
|
print(f" ⚠️ Bot not found as member in {guild.name}")
|
|
continue
|
|
|
|
# Get the highest role that the bot has (excluding @everyone)
|
|
roles = [r for r in member.roles if r.name != "@everyone"]
|
|
if not roles:
|
|
if debug:
|
|
print(f" ⚠️ No roles found in {guild.name}")
|
|
continue
|
|
|
|
# Look for a dedicated color role first (e.g., "Miku Color")
|
|
color_role = None
|
|
for role in guild.roles:
|
|
if role.name.lower() in ["miku color", "miku colour", "miku-color"]:
|
|
color_role = role
|
|
break
|
|
|
|
# Get bot's top role
|
|
bot_top_role = max(roles, key=lambda r: r.position)
|
|
|
|
# Use dedicated color role if found, otherwise use top role
|
|
if color_role:
|
|
if debug:
|
|
print(f" 🎨 Found dedicated color role: {color_role.name} (position {color_role.position})")
|
|
target_role = color_role
|
|
else:
|
|
if debug:
|
|
print(f" 📝 No 'Miku Color' role found, using top role: {bot_top_role.name} (position {bot_top_role.position})")
|
|
target_role = bot_top_role
|
|
|
|
# Check permissions
|
|
can_manage = guild.me.guild_permissions.manage_roles
|
|
|
|
if debug:
|
|
print(f" 🔑 Manage roles permission: {can_manage}")
|
|
print(f" 📊 Bot top role: {bot_top_role.name} (pos {bot_top_role.position}), Target: {target_role.name} (pos {target_role.position})")
|
|
|
|
# Only update if we have permission and it's not a special role
|
|
if can_manage:
|
|
# Run role edit in bot's event loop
|
|
if globals.client.loop and globals.client.loop.is_running():
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
target_role.edit(color=discord_color, reason="Profile picture color sync"),
|
|
globals.client.loop
|
|
)
|
|
future.result(timeout=5)
|
|
else:
|
|
await target_role.edit(color=discord_color, reason="Profile picture color sync")
|
|
|
|
updated_count += 1
|
|
if debug:
|
|
print(f" ✅ Updated role color in {guild.name}: {target_role.name}")
|
|
else:
|
|
if debug:
|
|
print(f" ⚠️ No manage_roles permission in {guild.name}")
|
|
|
|
except discord.Forbidden:
|
|
failed_count += 1
|
|
if debug:
|
|
print(f" ❌ Forbidden: No permission to update role in {guild.name}")
|
|
except Exception as e:
|
|
failed_count += 1
|
|
if debug:
|
|
print(f" ❌ Error updating role in {guild.name}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if updated_count > 0:
|
|
print(f"🎨 Updated role colors in {updated_count} server(s)")
|
|
else:
|
|
print(f"⚠️ No roles were updated (failed: {failed_count})")
|
|
if failed_count > 0 and debug:
|
|
print(f"⚠️ Failed to update {failed_count} server(s)")
|
|
|
|
async def set_custom_role_color(self, hex_color: str, debug: bool = False) -> Dict:
|
|
"""
|
|
Set a custom role color across all servers.
|
|
|
|
Args:
|
|
hex_color: Hex color code (e.g., "#86cecb" or "86cecb")
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
Dict with success status and count
|
|
"""
|
|
# Parse hex color
|
|
hex_color = hex_color.strip().lstrip('#')
|
|
try:
|
|
r = int(hex_color[0:2], 16)
|
|
g = int(hex_color[2:4], 16)
|
|
b = int(hex_color[4:6], 16)
|
|
color = (r, g, b)
|
|
except (ValueError, IndexError):
|
|
return {
|
|
"success": False,
|
|
"error": f"Invalid hex color: {hex_color}"
|
|
}
|
|
|
|
if debug:
|
|
print(f"🎨 Setting custom role color: #{hex_color} RGB{color}")
|
|
|
|
await self._update_role_colors(color, debug=debug)
|
|
|
|
return {
|
|
"success": True,
|
|
"color": {
|
|
"hex": f"#{hex_color}",
|
|
"rgb": color
|
|
}
|
|
}
|
|
|
|
async def reset_to_fallback_color(self, debug: bool = False) -> Dict:
|
|
"""
|
|
Reset role color to the fallback color (#86cecb).
|
|
|
|
Args:
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
Dict with success status
|
|
"""
|
|
if debug:
|
|
print(f"🎨 Resetting to fallback color: RGB{self.FALLBACK_ROLE_COLOR}")
|
|
|
|
await self._update_role_colors(self.FALLBACK_ROLE_COLOR, debug=debug)
|
|
|
|
return {
|
|
"success": True,
|
|
"color": {
|
|
"hex": "#86cecb",
|
|
"rgb": self.FALLBACK_ROLE_COLOR
|
|
}
|
|
}
|
|
|
|
def _save_metadata(self, metadata: Dict):
|
|
"""Save metadata about current profile picture"""
|
|
try:
|
|
with open(self.METADATA_PATH, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
except Exception as e:
|
|
print(f"⚠️ Error saving metadata: {e}")
|
|
|
|
def load_metadata(self) -> Optional[Dict]:
|
|
"""Load metadata about current profile picture"""
|
|
try:
|
|
if os.path.exists(self.METADATA_PATH):
|
|
with open(self.METADATA_PATH, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"⚠️ Error loading metadata: {e}")
|
|
return None
|
|
|
|
async def restore_fallback(self) -> bool:
|
|
"""Restore the fallback profile picture"""
|
|
try:
|
|
if not os.path.exists(self.FALLBACK_PATH):
|
|
print("⚠️ No fallback avatar found")
|
|
return False
|
|
|
|
with open(self.FALLBACK_PATH, 'rb') as f:
|
|
avatar_bytes = f.read()
|
|
|
|
if globals.client and globals.client.user:
|
|
# Run the edit operation in the bot's event loop
|
|
if globals.client.loop and globals.client.loop.is_running():
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
globals.client.user.edit(avatar=avatar_bytes),
|
|
globals.client.loop
|
|
)
|
|
future.result(timeout=10)
|
|
else:
|
|
await globals.client.user.edit(avatar=avatar_bytes)
|
|
|
|
print("✅ Restored fallback avatar")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error restoring fallback: {e}")
|
|
|
|
return False
|
|
|
|
def get_current_description(self) -> Optional[str]:
|
|
"""
|
|
Get the description of the current profile picture.
|
|
|
|
Returns:
|
|
Description string or None
|
|
"""
|
|
description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
|
|
try:
|
|
if os.path.exists(description_path):
|
|
with open(description_path, 'r', encoding='utf-8') as f:
|
|
return f.read().strip()
|
|
except Exception as e:
|
|
print(f"⚠️ Error reading description: {e}")
|
|
|
|
return None
|
|
|
|
|
|
# Global instance
|
|
profile_picture_manager = ProfilePictureManager()
|