# profile_picture_manager.py """ Intelligent profile picture manager for Miku. Handles searching, face detection, cropping, and Discord avatar updates. """ import os import io import aiohttp import asyncio from PIL import Image, ImageDraw import numpy as np import cv2 from datetime import datetime from typing import Optional, Dict, Tuple, List import json import discord import globals from .danbooru_client import danbooru_client import globals class ProfilePictureManager: """Manages Miku's profile picture with intelligent cropping and face detection""" PROFILE_PIC_DIR = "memory/profile_pictures" FALLBACK_PATH = "memory/profile_pictures/fallback.png" CURRENT_PATH = "memory/profile_pictures/current.png" METADATA_PATH = "memory/profile_pictures/metadata.json" # Face detection API endpoint FACE_DETECTOR_API = "http://anime-face-detector:6078/detect" # Fallback role color (Miku's iconic teal) FALLBACK_ROLE_COLOR = (134, 206, 203) # #86cecb def __init__(self): self._ensure_directories() def _ensure_directories(self): """Ensure profile picture directory exists""" os.makedirs(self.PROFILE_PIC_DIR, exist_ok=True) async def initialize(self): """Initialize the profile picture manager (check API availability)""" try: async with aiohttp.ClientSession() as session: async with session.get("http://anime-face-detector:6078/health", timeout=aiohttp.ClientTimeout(total=5)) as response: if response.status == 200: print("✅ Anime face detector API connected") return True except Exception as e: print(f"⚠️ Face detector API not available: {e}") print(" Profile picture changes will use fallback cropping") return False async def _ensure_vram_available(self, debug: bool = False): """ Ensure VRAM is available for face detection by swapping to text model. This unloads the vision model if it's loaded. """ try: if debug: print("💾 Swapping to text model to free VRAM for face detection...") # Make a simple request to text model to trigger swap async with aiohttp.ClientSession() as session: payload = { "model": "llama3.1", "messages": [{"role": "user", "content": "hi"}], "max_tokens": 1, "stream": False } async with session.post( "http://llama-swap:8080/v1/chat/completions", json=payload, timeout=aiohttp.ClientTimeout(total=60) ) as response: if response.status == 200: if debug: print("✅ Vision model unloaded, VRAM available") # Give system time to fully release VRAM await asyncio.sleep(3) return True except Exception as e: if debug: print(f"⚠️ Could not swap models: {e}") return False async def _start_face_detector(self, debug: bool = False) -> bool: """Start the face detector container using Docker socket API""" try: if debug: print("🚀 Starting face detector container...") # Use Docker socket API to start container import aiofiles import json as json_lib # Docker socket path socket_path = "/var/run/docker.sock" # Check if socket exists if not os.path.exists(socket_path): if debug: print("⚠️ Docker socket not available") return False # Use aiohttp UnixConnector to communicate with Docker socket from aiohttp import UnixConnector async with aiohttp.ClientSession( connector=UnixConnector(path=socket_path) ) as session: # Start the container url = "http://localhost/containers/anime-face-detector/start" async with session.post(url) as response: if response.status not in [204, 304]: # 204=started, 304=already running if debug: error_text = await response.text() print(f"⚠️ Failed to start container: {response.status} - {error_text}") return False # Wait for API to be ready for i in range(30): # 30 second timeout try: async with aiohttp.ClientSession() as session: async with session.get( "http://anime-face-detector:6078/health", timeout=aiohttp.ClientTimeout(total=2) ) as response: if response.status == 200: if debug: print(f"✅ Face detector ready (took {i+1}s)") return True except: pass await asyncio.sleep(1) if debug: print("⚠️ Face detector didn't become ready in time") return False except Exception as e: if debug: print(f"⚠️ Error starting face detector: {e}") return False async def _stop_face_detector(self, debug: bool = False): """Stop the face detector container using Docker socket API""" try: if debug: print("🛑 Stopping face detector to free VRAM...") socket_path = "/var/run/docker.sock" if not os.path.exists(socket_path): if debug: print("⚠️ Docker socket not available") return from aiohttp import UnixConnector async with aiohttp.ClientSession( connector=UnixConnector(path=socket_path) ) as session: # Stop the container url = "http://localhost/containers/anime-face-detector/stop" async with session.post(url, params={"t": 10}) as response: # 10 second timeout if response.status in [204, 304]: # 204=stopped, 304=already stopped if debug: print("✅ Face detector stopped") else: if debug: error_text = await response.text() print(f"⚠️ Failed to stop container: {response.status} - {error_text}") except Exception as e: if debug: print(f"⚠️ Error stopping face detector: {e}") async def save_current_avatar_as_fallback(self): """Save the bot's current avatar as fallback (only if fallback doesn't exist)""" try: # Only save if fallback doesn't already exist if os.path.exists(self.FALLBACK_PATH): print("✅ Fallback avatar already exists, skipping save") return True if not globals.client or not globals.client.user: print("⚠️ Bot client not ready") return False avatar_asset = globals.client.user.avatar or globals.client.user.default_avatar # Download avatar avatar_bytes = await avatar_asset.read() # Save as fallback with open(self.FALLBACK_PATH, 'wb') as f: f.write(avatar_bytes) print(f"✅ Saved current avatar as fallback ({len(avatar_bytes)} bytes)") return True except Exception as e: print(f"⚠️ Error saving fallback avatar: {e}") return False async def change_profile_picture( self, mood: Optional[str] = None, custom_image_bytes: Optional[bytes] = None, debug: bool = False, max_retries: int = 5 ) -> Dict: """ Main function to change Miku's profile picture. Args: mood: Current mood to influence Danbooru search custom_image_bytes: If provided, use this image instead of Danbooru debug: Enable debug output max_retries: Maximum number of attempts to find a valid Miku image (for Danbooru) Returns: Dict with status and metadata """ result = { "success": False, "source": None, "error": None, "metadata": {} } try: # Step 1: Get and validate image (with retry for Danbooru) image_bytes = None image = None if custom_image_bytes: # Custom upload - no retry needed if debug: print("🖼️ Using provided custom image") image_bytes = custom_image_bytes result["source"] = "custom_upload" # Load image with PIL try: image = Image.open(io.BytesIO(image_bytes)) if debug: print(f"📐 Original image size: {image.size}") except Exception as e: result["error"] = f"Failed to open image: {e}" return result else: # Danbooru - retry until we find a valid Miku image if debug: print(f"🎨 Searching Danbooru for Miku image (mood: {mood})") for attempt in range(max_retries): if attempt > 0 and debug: print(f"🔄 Retry attempt {attempt + 1}/{max_retries}") post = await danbooru_client.get_random_miku_image(mood=mood) if not post: continue image_url = danbooru_client.extract_image_url(post) if not image_url: continue # Download image temp_image_bytes = await self._download_image(image_url) if not temp_image_bytes: continue if debug: print(f"✅ Downloaded image from Danbooru (post #{danbooru_client.get_post_metadata(post).get('id')})") # Load image with PIL try: temp_image = Image.open(io.BytesIO(temp_image_bytes)) if debug: print(f"📐 Original image size: {temp_image.size}") except Exception as e: if debug: print(f"⚠️ Failed to open image: {e}") continue # Verify it's Miku miku_verification = await self._verify_and_locate_miku(temp_image_bytes, debug=debug) if not miku_verification["is_miku"]: if debug: print(f"❌ Image verification failed: not Miku, trying another...") continue # Success! This image is valid image_bytes = temp_image_bytes image = temp_image result["source"] = "danbooru" result["metadata"] = danbooru_client.get_post_metadata(post) # If multiple characters detected, use LLM's suggested crop region if miku_verification.get("crop_region"): if debug: print(f"🎯 Using LLM-suggested crop region for Miku") image = self._apply_crop_region(image, miku_verification["crop_region"]) break # Check if we found a valid image if not image_bytes or not image: result["error"] = f"Could not find valid Miku image after {max_retries} attempts" return result # Step 2: Generate description of the validated image if debug: print("📝 Generating image description...") description = await self._generate_image_description(image_bytes, debug=debug) if description: # Save description to file description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt") try: with open(description_path, 'w', encoding='utf-8') as f: f.write(description) result["metadata"]["description"] = description if debug: print(f"📝 Saved image description ({len(description)} chars)") except Exception as e: print(f"⚠️ Failed to save description file: {e}") else: if debug: print("⚠️ Description generation returned None") # Step 3: Detect face and crop intelligently cropped_image = await self._intelligent_crop(image, image_bytes, target_size=512, debug=debug) if not cropped_image: result["error"] = "Failed to crop image" return result # Step 4: Save the cropped image first output_buffer = io.BytesIO() cropped_image.save(output_buffer, format='PNG') cropped_bytes = output_buffer.getvalue() # Save to disk as current with open(self.CURRENT_PATH, 'wb') as f: f.write(cropped_bytes) if debug: print(f"💾 Saved cropped image ({len(cropped_bytes)} bytes)") # Step 5: Extract dominant color from saved current.png saved_image = Image.open(self.CURRENT_PATH) dominant_color = self._extract_dominant_color(saved_image, debug=debug) if dominant_color: result["metadata"]["dominant_color"] = { "rgb": dominant_color, "hex": "#{:02x}{:02x}{:02x}".format(*dominant_color) } if debug: print(f"🎨 Dominant color: RGB{dominant_color} (#{result['metadata']['dominant_color']['hex'][1:]})") # Step 6: Update Discord avatar if globals.client and globals.client.user: try: # Run the edit operation in the bot's event loop if globals.client.loop and globals.client.loop.is_running(): # Create a future to run in bot's loop future = asyncio.run_coroutine_threadsafe( globals.client.user.edit(avatar=cropped_bytes), globals.client.loop ) # Wait for the result future.result(timeout=10) else: # Fallback if loop not available (shouldn't happen) await globals.client.user.edit(avatar=cropped_bytes) result["success"] = True result["metadata"]["changed_at"] = datetime.now().isoformat() # Save metadata self._save_metadata(result["metadata"]) print(f"✅ Profile picture updated successfully!") # Step 7: Update role colors across all servers if dominant_color: await self._update_role_colors(dominant_color, debug=debug) except discord.HTTPException as e: result["error"] = f"Discord API error: {e}" print(f"⚠️ Failed to update Discord avatar: {e}") except Exception as e: result["error"] = f"Unexpected error updating avatar: {e}" print(f"⚠️ Unexpected error: {e}") else: result["error"] = "Bot client not ready" except Exception as e: result["error"] = f"Unexpected error: {e}" print(f"⚠️ Error in change_profile_picture: {e}") return result async def _download_image(self, url: str) -> Optional[bytes]: """Download image from URL""" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=15) as response: if response.status == 200: return await response.read() except Exception as e: print(f"⚠️ Error downloading image: {e}") return None async def _generate_image_description(self, image_bytes: bytes, debug: bool = False) -> Optional[str]: """ Generate a detailed description of the profile picture using vision model. This description will be used when users ask about the pfp. Args: image_bytes: Raw image bytes debug: Enable debug output Returns: Description string or None """ try: import base64 image_b64 = base64.b64encode(image_bytes).decode('utf-8') if debug: print(f"📸 Encoded image: {len(image_b64)} chars, calling vision model...") prompt = """This is an image of Hatsune Miku that will be used as a profile picture. Please describe this image in detail, including: - What Miku is wearing (outfit, colors, accessories) - Her pose and expression - The art style and mood of the image - Any notable background elements - Overall atmosphere or theme Keep the description conversational and in second-person (referring to Miku as "you"), as if Miku herself is describing her own appearance in this image.""" payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_b64}" } } ] } ], "stream": False, "max_tokens": 400, "temperature": 0.7 } headers = {"Content-Type": "application/json"} if debug: print(f"🌐 Calling {globals.LLAMA_URL}/v1/chat/completions with model {globals.VISION_MODEL}") async with aiohttp.ClientSession() as session: async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status == 200: data = await resp.json() if debug: print(f"📦 API Response keys: {data.keys()}") print(f"📦 Choices: {data.get('choices', [])}") # Try to get content from the response choice = data.get("choices", [{}])[0] message = choice.get("message", {}) # Check both 'content' and 'reasoning_content' fields description = message.get("content", "") # If content is empty, try reasoning_content (chain-of-thought models) if not description or not description.strip(): description = message.get("reasoning_content", "") if description and description.strip(): if debug: print(f"✅ Generated description: {description[:100]}...") return description.strip() else: if debug: print(f"⚠️ Description is empty or None") print(f" Full response: {data}") else: print(f"⚠️ Description is empty or None") return None else: error_text = await resp.text() print(f"❌ Vision API error generating description: {resp.status} - {error_text}") except Exception as e: print(f"⚠️ Error generating image description: {e}") import traceback traceback.print_exc() return None async def _verify_and_locate_miku(self, image_bytes: bytes, debug: bool = False) -> Dict: """ Use vision model to verify image contains Miku and locate her if multiple characters. Returns: Dict with is_miku bool and optional crop_region """ result = { "is_miku": False, "crop_region": None, "character_count": 0 } try: # Encode image to base64 import base64 image_b64 = base64.b64encode(image_bytes).decode('utf-8') # Query vision model using OpenAI-compatible API prompt = """Analyze this image and answer: 1. Is Hatsune Miku present in this image? (yes/no) 2. How many characters are in the image? (number) 3. If there are multiple characters, describe where Miku is located (left/right/center, top/bottom/middle) Respond in JSON format: { "is_miku": true/false, "character_count": number, "miku_location": "description or null" }""" payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_b64}" } } ] } ], "stream": False, "max_tokens": 200, "temperature": 0.3 } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers) as resp: if resp.status == 200: data = await resp.json() response = data.get("choices", [{}])[0].get("message", {}).get("content", "") else: error_text = await resp.text() print(f"❌ Vision API error: {resp.status} - {error_text}") return result if debug: print(f"🤖 Vision model response: {response}") # Parse JSON response import re json_match = re.search(r'\{[^}]+\}', response) if json_match: data = json.loads(json_match.group()) result["is_miku"] = data.get("is_miku", False) result["character_count"] = data.get("character_count", 1) # If multiple characters, parse location if result["character_count"] > 1 and data.get("miku_location"): result["crop_region"] = self._parse_location_to_region( data["miku_location"], debug=debug ) else: # Fallback: simple text analysis response_lower = response.lower() result["is_miku"] = "yes" in response_lower or "miku" in response_lower except Exception as e: print(f"⚠️ Error in vision verification: {e}") # Assume it's Miku on error (trust Danbooru tags) result["is_miku"] = True return result def _parse_location_to_region(self, location: str, debug: bool = False) -> Optional[Dict]: """Parse location description to crop region coordinates""" location_lower = location.lower() # Simple region detection region = { "horizontal": "center", # left, center, right "vertical": "middle" # top, middle, bottom } if "left" in location_lower: region["horizontal"] = "left" elif "right" in location_lower: region["horizontal"] = "right" if "top" in location_lower: region["vertical"] = "top" elif "bottom" in location_lower: region["vertical"] = "bottom" if debug: print(f"📍 Parsed location '{location}' -> {region}") return region def _apply_crop_region(self, image: Image.Image, region: Dict) -> Image.Image: """Apply crop region based on parsed location""" width, height = image.size # Determine crop box based on region # We want roughly 1/2 to 2/3 of the image crop_width = int(width * 0.6) crop_height = int(height * 0.6) # Horizontal position if region["horizontal"] == "left": left = 0 right = crop_width elif region["horizontal"] == "right": left = width - crop_width right = width else: # center left = (width - crop_width) // 2 right = left + crop_width # Vertical position if region["vertical"] == "top": top = 0 bottom = crop_height elif region["vertical"] == "bottom": top = height - crop_height bottom = height else: # middle top = (height - crop_height) // 2 bottom = top + crop_height return image.crop((left, top, right, bottom)) async def _intelligent_crop( self, image: Image.Image, image_bytes: bytes, target_size: int = 512, debug: bool = False ) -> Optional[Image.Image]: """ Intelligently crop image to square, centering on detected face. Args: image: PIL Image image_bytes: Image data as bytes (for API call) target_size: Target size for square output debug: Enable debug output Returns: Cropped PIL Image or None """ width, height = image.size # Try face detection via API first face_detection = await self._detect_face(image_bytes, debug=debug) if face_detection and face_detection.get('center'): if debug: print(f"😊 Face detected at {face_detection['center']}") crop_center = face_detection['center'] else: if debug: print("🎯 No face detected, using saliency detection") # Fallback to saliency detection cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) crop_center = self._detect_saliency(cv_image, debug=debug) # Determine crop box (square) # Use 60% of the smaller dimension to include face context base_size = min(width, height) crop_size = int(base_size * 0.6) # For very large images, cap the crop size at 1000px if crop_size > 1000: crop_size = 1000 # Minimum crop size for quality if crop_size < 400: crop_size = 400 # Center the crop on the detected point left = crop_center[0] - crop_size // 2 top = crop_center[1] - crop_size // 2 # Adjust if crop goes out of bounds # Instead of clamping, try to shift to keep face centered if left < 0: left = 0 elif left + crop_size > width: left = width - crop_size if top < 0: # Face is too close to top edge # Shift down but keep face in upper portion (top 40%) top = 0 # Adjust crop_center for logging if debug: print(f"⚠️ Face too close to top edge, shifted crop to y=0") elif top + crop_size > height: # Face is too close to bottom edge top = height - crop_size if debug: print(f"⚠️ Face too close to bottom edge, shifted crop to y={top}") # Crop cropped = image.crop((left, top, left + crop_size, top + crop_size)) # Resize to target size cropped = cropped.resize((target_size, target_size), Image.Resampling.LANCZOS) if debug: print(f"✂️ Cropped to {target_size}x{target_size} centered at {crop_center}") return cropped async def _detect_face(self, image_bytes: bytes, debug: bool = False) -> Optional[Dict]: """ Detect anime face in image using external API and return detection data. Args: image_bytes: Image data as bytes debug: Enable debug output Returns: Dict with detection data including bbox, confidence, keypoints, or None """ face_detector_started = False try: # Step 1: Ensure VRAM is available by unloading vision model await self._ensure_vram_available(debug=debug) # Step 2: Start face detector container if not await self._start_face_detector(debug=debug): if debug: print("⚠️ Could not start face detector") return None face_detector_started = True # Step 3: Call the face detection API async with aiohttp.ClientSession() as session: # Prepare multipart form data form = aiohttp.FormData() form.add_field('file', image_bytes, filename='image.jpg', content_type='image/jpeg') async with session.post( self.FACE_DETECTOR_API, data=form, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status != 200: if debug: print(f"⚠️ Face detection API returned status {response.status}") return None result = await response.json() if result.get('count', 0) == 0: if debug: print("👤 No faces detected by API") return None # Get detections and pick the one with highest confidence detections = result.get('detections', []) if not detections: return None best_detection = max(detections, key=lambda d: d.get('confidence', 0)) # Extract bbox coordinates bbox = best_detection.get('bbox', []) confidence = best_detection.get('confidence', 0) keypoints = best_detection.get('keypoints', []) if len(bbox) >= 4: x1, y1, x2, y2 = bbox[:4] center_x = int((x1 + x2) / 2) center_y = int((y1 + y2) / 2) if debug: width = int(x2 - x1) height = int(y2 - y1) print(f"👤 Detected {len(detections)} face(s) via API, using best at ({center_x}, {center_y}) [confidence: {confidence:.2%}]") print(f" Bounding box: x={int(x1)}, y={int(y1)}, w={width}, h={height}") print(f" Keypoints: {len(keypoints)} facial landmarks detected") return { 'center': (center_x, center_y), 'bbox': bbox, 'confidence': confidence, 'keypoints': keypoints, 'count': len(detections) } except asyncio.TimeoutError: if debug: print("⚠️ Face detection API timeout") except Exception as e: if debug: print(f"⚠️ Error calling face detection API: {e}") finally: # Always stop face detector to free VRAM if face_detector_started: await self._stop_face_detector(debug=debug) return None def _detect_saliency(self, cv_image: np.ndarray, debug: bool = False) -> Tuple[int, int]: """ Detect most salient (interesting) region of image. Fallback when face detection fails. Returns: Tuple of (x, y) center coordinates """ try: height, width = cv_image.shape[:2] # Use OpenCV's saliency detector saliency = cv2.saliency.StaticSaliencySpectralResidual_create() success, saliency_map = saliency.computeSaliency(cv_image) if success: # Find the point with highest saliency saliency_map = (saliency_map * 255).astype("uint8") _, max_val, _, max_loc = cv2.minMaxLoc(saliency_map) if debug: print(f"🎯 Saliency peak at {max_loc}") return max_loc except Exception as e: if debug: print(f"⚠️ Saliency detection failed: {e}") # Ultimate fallback: center of image height, width = cv_image.shape[:2] return (width // 2, height // 2) def _extract_dominant_color(self, image: Image.Image, debug: bool = False) -> Optional[Tuple[int, int, int]]: """ Extract the dominant color from an image using k-means clustering. Args: image: PIL Image debug: Enable debug output Returns: RGB tuple (r, g, b) or None """ try: # Resize for faster processing small_image = image.resize((150, 150)) # Convert to RGB if needed (handles grayscale, RGBA, etc.) if small_image.mode != 'RGB': small_image = small_image.convert('RGB') # Convert to numpy array pixels = np.array(small_image) # Reshape to list of RGB pixels pixels = pixels.reshape(-1, 3) # Remove very dark (near black) and very bright (near white) pixels # to avoid getting boring colors mask = ~((pixels.sum(axis=1) < 30) | (pixels.sum(axis=1) > 720)) pixels = pixels[mask] if len(pixels) == 0: if debug: print("⚠️ No valid pixels after filtering, using fallback") return (200, 200, 200) # Neutral gray fallback # Use k-means to find dominant colors from sklearn.cluster import KMeans n_colors = 5 kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=10) kmeans.fit(pixels) # Get cluster centers (dominant colors) and their frequencies colors = kmeans.cluster_centers_ labels = kmeans.labels_ counts = np.bincount(labels) if debug: print(f"🎨 Found {n_colors} color clusters:") for i, (color, count) in enumerate(zip(colors, counts)): pct = (count / len(labels)) * 100 r, g, b = color.astype(int) print(f" {i+1}. RGB({r}, {g}, {b}) = #{r:02x}{g:02x}{b:02x} ({pct:.1f}%)") # Sort by frequency sorted_indices = np.argsort(-counts) # Pick the most vibrant/saturated color from top 3 best_color = None best_saturation = 0 for idx in sorted_indices[:3]: color = colors[idx].astype(int) r, g, b = color # Calculate saturation (how vibrant the color is) max_c = max(r, g, b) min_c = min(r, g, b) saturation = (max_c - min_c) / max_c if max_c > 0 else 0 if debug: print(f" Color RGB({r}, {g}, {b}) saturation: {saturation:.2f}") # Prefer more saturated colors if saturation > best_saturation: best_saturation = saturation # Convert to native Python ints for JSON serialization best_color = (int(r), int(g), int(b)) if best_color: if debug: print(f"🎨 Selected color: RGB{best_color} (saturation: {best_saturation:.2f})") return best_color # Fallback to most common color dominant_color = colors[sorted_indices[0]].astype(int) # Convert to native Python ints result = (int(dominant_color[0]), int(dominant_color[1]), int(dominant_color[2])) if debug: print(f"🎨 Using most common color: RGB{result}") return result except Exception as e: if debug: print(f"⚠️ Error extracting dominant color: {e}") return None async def _update_role_colors(self, color: Tuple[int, int, int], debug: bool = False): """ Update Miku's role color across all servers. Args: color: RGB tuple (r, g, b) debug: Enable debug output """ if debug: print(f"🎨 Starting role color update with RGB{color}") if not globals.client: if debug: print("⚠️ No client available for role updates") return if debug: print(f"🌐 Found {len(globals.client.guilds)} guild(s)") # Convert RGB to Discord color (integer) discord_color = discord.Color.from_rgb(*color) updated_count = 0 failed_count = 0 for guild in globals.client.guilds: try: if debug: print(f"🔍 Checking guild: {guild.name}") # Find the bot's top role (usually colored role) member = guild.get_member(globals.client.user.id) if not member: if debug: print(f" ⚠️ Bot not found as member in {guild.name}") continue # Get the highest role that the bot has (excluding @everyone) roles = [r for r in member.roles if r.name != "@everyone"] if not roles: if debug: print(f" ⚠️ No roles found in {guild.name}") continue # Look for a dedicated color role first (e.g., "Miku Color") color_role = None for role in guild.roles: if role.name.lower() in ["miku color", "miku colour", "miku-color"]: color_role = role break # Get bot's top role bot_top_role = max(roles, key=lambda r: r.position) # Use dedicated color role if found, otherwise use top role if color_role: if debug: print(f" 🎨 Found dedicated color role: {color_role.name} (position {color_role.position})") target_role = color_role else: if debug: print(f" 📝 No 'Miku Color' role found, using top role: {bot_top_role.name} (position {bot_top_role.position})") target_role = bot_top_role # Check permissions can_manage = guild.me.guild_permissions.manage_roles if debug: print(f" 🔑 Manage roles permission: {can_manage}") print(f" 📊 Bot top role: {bot_top_role.name} (pos {bot_top_role.position}), Target: {target_role.name} (pos {target_role.position})") # Only update if we have permission and it's not a special role if can_manage: # Run role edit in bot's event loop if globals.client.loop and globals.client.loop.is_running(): future = asyncio.run_coroutine_threadsafe( target_role.edit(color=discord_color, reason="Profile picture color sync"), globals.client.loop ) future.result(timeout=5) else: await target_role.edit(color=discord_color, reason="Profile picture color sync") updated_count += 1 if debug: print(f" ✅ Updated role color in {guild.name}: {target_role.name}") else: if debug: print(f" ⚠️ No manage_roles permission in {guild.name}") except discord.Forbidden: failed_count += 1 if debug: print(f" ❌ Forbidden: No permission to update role in {guild.name}") except Exception as e: failed_count += 1 if debug: print(f" ❌ Error updating role in {guild.name}: {e}") import traceback traceback.print_exc() if updated_count > 0: print(f"🎨 Updated role colors in {updated_count} server(s)") else: print(f"⚠️ No roles were updated (failed: {failed_count})") if failed_count > 0 and debug: print(f"⚠️ Failed to update {failed_count} server(s)") async def set_custom_role_color(self, hex_color: str, debug: bool = False) -> Dict: """ Set a custom role color across all servers. Args: hex_color: Hex color code (e.g., "#86cecb" or "86cecb") debug: Enable debug output Returns: Dict with success status and count """ # Parse hex color hex_color = hex_color.strip().lstrip('#') try: r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) color = (r, g, b) except (ValueError, IndexError): return { "success": False, "error": f"Invalid hex color: {hex_color}" } if debug: print(f"🎨 Setting custom role color: #{hex_color} RGB{color}") await self._update_role_colors(color, debug=debug) return { "success": True, "color": { "hex": f"#{hex_color}", "rgb": color } } async def reset_to_fallback_color(self, debug: bool = False) -> Dict: """ Reset role color to the fallback color (#86cecb). Args: debug: Enable debug output Returns: Dict with success status """ if debug: print(f"🎨 Resetting to fallback color: RGB{self.FALLBACK_ROLE_COLOR}") await self._update_role_colors(self.FALLBACK_ROLE_COLOR, debug=debug) return { "success": True, "color": { "hex": "#86cecb", "rgb": self.FALLBACK_ROLE_COLOR } } def _save_metadata(self, metadata: Dict): """Save metadata about current profile picture""" try: with open(self.METADATA_PATH, 'w') as f: json.dump(metadata, f, indent=2) except Exception as e: print(f"⚠️ Error saving metadata: {e}") def load_metadata(self) -> Optional[Dict]: """Load metadata about current profile picture""" try: if os.path.exists(self.METADATA_PATH): with open(self.METADATA_PATH, 'r') as f: return json.load(f) except Exception as e: print(f"⚠️ Error loading metadata: {e}") return None async def restore_fallback(self) -> bool: """Restore the fallback profile picture""" try: if not os.path.exists(self.FALLBACK_PATH): print("⚠️ No fallback avatar found") return False with open(self.FALLBACK_PATH, 'rb') as f: avatar_bytes = f.read() if globals.client and globals.client.user: # Run the edit operation in the bot's event loop if globals.client.loop and globals.client.loop.is_running(): future = asyncio.run_coroutine_threadsafe( globals.client.user.edit(avatar=avatar_bytes), globals.client.loop ) future.result(timeout=10) else: await globals.client.user.edit(avatar=avatar_bytes) print("✅ Restored fallback avatar") return True except Exception as e: print(f"⚠️ Error restoring fallback: {e}") return False def get_current_description(self) -> Optional[str]: """ Get the description of the current profile picture. Returns: Description string or None """ description_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt") try: if os.path.exists(description_path): with open(description_path, 'r', encoding='utf-8') as f: return f.read().strip() except Exception as e: print(f"⚠️ Error reading description: {e}") return None # Global instance profile_picture_manager = ProfilePictureManager()