# utils/image_handling.py import aiohttp import base64 import io import tempfile import os import subprocess from PIL import Image import re import globals from utils.logger import get_logger logger = get_logger('vision') # No need for switch_model anymore - llama-swap handles this automatically def _extract_vision_question(prompt: str): """ Strip Discord mentions and bot-name triggers from the user's message to produce a clean question suitable for passing directly to the vision model. Returns the cleaned question string, or None if nothing meaningful remains (e.g. the message was just "@Miku" or "miku," with no actual question). """ if not prompt: return None # Remove Discord user/role mentions: <@123456789>, <@!123456789> text = re.sub(r'<@[!&]?\d+>', '', prompt).strip() # Strip common bot-name invocation prefixes at the very start (case-insensitive) # e.g. "miku,", "hey miku,", "miku!", "Miku: " text = re.sub(r'^(?:hey\s+)?miku[,!:\s]+', '', text, flags=re.IGNORECASE).strip() # Drop any residual leading punctuation/whitespace text = text.lstrip(',.!? ') return text if text else None async def download_and_encode_image(url): """Download and encode an image to base64.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: return None img_bytes = await resp.read() return base64.b64encode(img_bytes).decode('utf-8') async def download_and_encode_media(url): """Download and encode any media file (image, video, GIF) to base64.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: return None media_bytes = await resp.read() return base64.b64encode(media_bytes).decode('utf-8') async def extract_tenor_gif_url(tenor_url): """ Extract the actual GIF URL from a Tenor link. Tenor URLs look like: https://tenor.com/view/... We need to get the actual GIF file URL from the page or API. """ try: # Try to extract GIF ID from URL # Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif match = re.search(r'tenor\.com/view/[^/]+-(\d+)', tenor_url) if not match: match = re.search(r'tenor\.com/(\d+)\.gif', tenor_url) if not match: logger.warning(f"Could not extract Tenor GIF ID from: {tenor_url}") return None gif_id = match.group(1) # Tenor's direct media URL format (this works without API key) # Try the media CDN URL directly media_url = f"https://media.tenor.com/images/{gif_id}/tenor.gif" # Verify the URL works async with aiohttp.ClientSession() as session: async with session.head(media_url) as resp: if resp.status == 200: logger.debug(f"Found Tenor GIF: {media_url}") return media_url # If that didn't work, try alternative formats for fmt in ['tenor.gif', 'raw']: alt_url = f"https://media.tenor.com/{gif_id}/{fmt}" async with aiohttp.ClientSession() as session: async with session.head(alt_url) as resp: if resp.status == 200: logger.debug(f"Found Tenor GIF (alternative): {alt_url}") return alt_url logger.warning(f"Could not find working Tenor media URL for ID: {gif_id}") return None except Exception as e: logger.error(f"Error extracting Tenor GIF URL: {e}") return None async def convert_gif_to_mp4(gif_bytes): """ Convert a GIF to MP4 using ffmpeg for better compatibility with video processing. Returns the MP4 bytes. """ try: # Write GIF to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.gif') as temp_gif: temp_gif.write(gif_bytes) temp_gif_path = temp_gif.name # Output MP4 path temp_mp4_path = temp_gif_path.replace('.gif', '.mp4') try: # Convert GIF to MP4 with ffmpeg # -movflags faststart makes it streamable # -pix_fmt yuv420p ensures compatibility # -vf scale makes sure dimensions are even (required for yuv420p) ffmpeg_cmd = [ 'ffmpeg', '-i', temp_gif_path, '-movflags', 'faststart', '-pix_fmt', 'yuv420p', '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', '-y', temp_mp4_path ] result = subprocess.run(ffmpeg_cmd, capture_output=True, check=True) # Read the MP4 file with open(temp_mp4_path, 'rb') as f: mp4_bytes = f.read() logger.info(f"Converted GIF to MP4 ({len(gif_bytes)} bytes → {len(mp4_bytes)} bytes)") return mp4_bytes finally: # Clean up temp files if os.path.exists(temp_gif_path): os.remove(temp_gif_path) if os.path.exists(temp_mp4_path): os.remove(temp_mp4_path) except subprocess.CalledProcessError as e: logger.error(f"ffmpeg error converting GIF to MP4: {e.stderr.decode()}") return None except Exception as e: logger.error(f"Error converting GIF to MP4: {e}") import traceback traceback.print_exc() return None async def extract_video_frames(video_bytes, num_frames=4): """ Extract frames from a video or GIF for analysis. Returns a list of base64-encoded frames. """ try: # Try GIF first with PIL try: gif = Image.open(io.BytesIO(video_bytes)) if hasattr(gif, 'n_frames'): frames = [] # Calculate step to get evenly distributed frames total_frames = gif.n_frames step = max(1, total_frames // num_frames) for i in range(0, total_frames, step): if len(frames) >= num_frames: break gif.seek(i) frame = gif.convert('RGB') # Convert to base64 buffer = io.BytesIO() frame.save(buffer, format='JPEG') frame_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') frames.append(frame_b64) if frames: return frames except Exception as e: logger.debug(f"Not a GIF, trying video extraction: {e}") # For video files (MP4, WebM, etc.), use ffmpeg import subprocess import asyncio # Write video bytes to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video: temp_video.write(video_bytes) temp_video_path = temp_video.name try: # Get video duration first probe_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', temp_video_path ] result = subprocess.run(probe_cmd, capture_output=True, text=True) duration = float(result.stdout.strip()) # Calculate timestamps for evenly distributed frames timestamps = [duration * i / num_frames for i in range(num_frames)] frames = [] for i, timestamp in enumerate(timestamps): # Extract frame at timestamp output_path = f"/tmp/frame_{i}.jpg" ffmpeg_cmd = [ 'ffmpeg', '-ss', str(timestamp), '-i', temp_video_path, '-vframes', '1', '-q:v', '2', '-y', output_path ] subprocess.run(ffmpeg_cmd, capture_output=True, check=True) # Read and encode the frame with open(output_path, 'rb') as f: frame_bytes = f.read() frame_b64 = base64.b64encode(frame_bytes).decode('utf-8') frames.append(frame_b64) # Clean up frame file os.remove(output_path) return frames finally: # Clean up temp video file os.remove(temp_video_path) except Exception as e: logger.error(f"Error extracting frames: {e}") import traceback traceback.print_exc() return None async def analyze_image_with_vision(base64_img, user_prompt=None): """ Analyze an image using llama.cpp multimodal capabilities. Uses OpenAI-compatible chat completions API with image_url. Always uses NVIDIA GPU for vision model. If user_prompt is provided (and contains a meaningful question after stripping mentions/triggers), that question is sent to the vision model instead of the generic "Describe this image in detail." prompt. """ from utils.llm import get_vision_gpu_url, check_vision_endpoint_health # Check if vision endpoint is healthy before attempting request is_healthy, error = await check_vision_endpoint_health() if not is_healthy: logger.warning(f"Vision endpoint unhealthy: {error}") return f"Vision service currently unavailable: {error}" question = _extract_vision_question(user_prompt) vision_prompt_text = question if question else "Describe this image in detail." logger.info(f"Vision prompt for image: {vision_prompt_text!r}") payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": [ { "type": "text", "text": vision_prompt_text }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_img}" } } ] } ], "stream": False, "max_tokens": 800 } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: try: vision_url = get_vision_gpu_url() logger.info(f"Sending vision request to {vision_url} using model: {globals.VISION_MODEL}") async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as response: if response.status == 200: data = await response.json() result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.") logger.info(f"Vision analysis completed successfully") return result else: error_text = await response.text() logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})") return f"Error analyzing image: {response.status}" except Exception as e: logger.error(f"Error in analyze_image_with_vision: {e}", exc_info=True) return f"Error analyzing image: {str(e)}" async def analyze_video_with_vision(video_frames, media_type="video", user_prompt=None): """ Analyze a video or GIF by analyzing multiple frames. video_frames: list of base64-encoded frames media_type: "video", "gif", or "tenor_gif" to customize the analysis prompt user_prompt: optional raw user message; the vision model will be asked to answer the specific question instead of giving a generic description. """ from utils.llm import get_vision_gpu_url, check_vision_endpoint_health # Check if vision endpoint is healthy before attempting request is_healthy, error = await check_vision_endpoint_health() if not is_healthy: logger.warning(f"Vision endpoint unhealthy: {error}") return f"Vision service currently unavailable: {error}" # Customize prompt based on media type, overridden by user question if present question = _extract_vision_question(user_prompt) if question: prompt_text = question logger.info(f"Vision prompt for {media_type}: {prompt_text!r}") elif media_type == "gif": prompt_text = "Describe what's happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns." elif media_type == "tenor_gif": prompt_text = "Describe what's happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown." else: # video prompt_text = "Describe what's happening in this video. Analyze the sequence of frames and describe the action or motion." # Build content with multiple images content = [ { "type": "text", "text": prompt_text } ] # Add each frame as an image for frame in video_frames: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{frame}" } }) payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": content } ], "stream": False, "max_tokens": 1000 } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: try: vision_url = get_vision_gpu_url() logger.info(f"Sending video analysis request to {vision_url} using model: {globals.VISION_MODEL} (media_type: {media_type}, frames: {len(video_frames)})") async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status == 200: data = await response.json() result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.") logger.info(f"Video analysis completed successfully") return result else: error_text = await response.text() logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})") return f"Error analyzing video: {response.status}" except Exception as e: logger.error(f"Error in analyze_video_with_vision: {e}", exc_info=True) return f"Error analyzing video: {str(e)}" async def rephrase_as_miku(vision_output, user_prompt, guild_id=None, user_id=None, author_name=None, media_type="image"): """ Rephrase vision model's image analysis as Miku would respond to it. Routes through Cheshire Cat pipeline for memory-augmented responses, falling back to direct query_llama() if Cat is unavailable. Args: vision_output: Description from vision model user_prompt: User's original message guild_id: Guild ID for server context (None for DMs) user_id: User ID for conversation history author_name: Display name of the user media_type: Type of media ("image", "video", "gif", or "tenor_gif") """ from utils.llm import query_llama # Format the user's message to include vision context with media type # This will be saved to history automatically by query_llama _MEDIA_PREFIXES = { "gif": "Looking at a GIF", "tenor_gif": "Looking at a Tenor GIF", "video": "Looking at a video", "rich_embed": "Looking at embedded content", } media_prefix = _MEDIA_PREFIXES.get(media_type, "Looking at an image") if user_prompt: # Include media type, vision description, and user's text formatted_prompt = f"[{media_prefix}: {vision_output}] {user_prompt}" else: # If no text, just the vision description with media type formatted_prompt = f"[{media_prefix}: {vision_output}]" # Use the standard LLM query with appropriate response type response_type = "dm_response" if guild_id is None else "server_response" # Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility history_user_id = user_id if user_id else "image_analysis" # Determine current mood for Cat pipeline current_mood = globals.DM_MOOD if guild_id: try: from server_manager import server_manager sc = server_manager.get_server_config(guild_id) if sc: current_mood = sc.current_mood_name except Exception: pass # Phase 3: Try Cheshire Cat pipeline first (memory-augmented response) # This allows image interactions to be stored in episodic memory and # benefit from declarative memory recall, just like text messages. response = None if globals.USE_CHESHIRE_CAT: try: from utils.cat_client import cat_adapter cat_result = await cat_adapter.query( text=formatted_prompt, user_id=history_user_id, guild_id=str(guild_id) if guild_id else None, author_name=author_name, mood=current_mood, response_type=response_type, media_type=media_type, ) if cat_result: response, cat_full_prompt = cat_result effective_mood = current_mood if globals.EVIL_MODE: effective_mood = f"EVIL:{getattr(globals, 'EVIL_DM_MOOD', 'evil_neutral')}" logger.info(f"🐱 Cat {media_type} response for {author_name} (mood: {effective_mood})") # Track Cat interaction for Web UI Last Prompt view import datetime globals.LAST_CAT_INTERACTION = { "full_prompt": cat_full_prompt, "response": response[:500] if response else "", "user": author_name or history_user_id, "mood": effective_mood, "timestamp": datetime.datetime.now().isoformat(), } except Exception as e: logger.warning(f"🐱 Cat {media_type} pipeline error, falling back to query_llama: {e}") response = None # Fallback to direct LLM query if Cat didn't respond if not response: response = await query_llama( formatted_prompt, user_id=history_user_id, guild_id=guild_id, response_type=response_type, author_name=author_name, media_type=media_type # Pass media type to Miku's LLM ) return response # Backward compatibility aliases analyze_image_with_qwen = analyze_image_with_vision # --------------------------------------------------------------------------- # Shared tail helper — send response, log DM, check bipolar interjection # --------------------------------------------------------------------------- async def _send_log_bipolar(message, reply_text, is_dm, *, media_label=""): """ Common tail shared by every media handler *and* the text-fallback path in bot.py. Sends *reply_text* to the channel, logs the reply in the DM ledger when appropriate, and fires a bipolar-interjection check for server messages. Returns the sent ``discord.Message`` so callers can use it if needed. """ from utils.dm_logger import dm_logger from utils.task_tracker import create_tracked_task label = f" {media_label}" if media_label else "" if is_dm: logger.info( f"💌 DM{label} response to {message.author.display_name} " f"(using DM mood: {globals.DM_MOOD})" ) else: guild_name = message.guild.name if message.guild else "unknown" logger.info( f"💬 Server{label} response to {message.author.display_name} " f"in {guild_name} (using server mood)" ) response_message = await message.channel.send(reply_text) # Log bot's reply in the DM ledger if is_dm: dm_logger.log_user_message(message.author, response_message, is_bot_message=True) # Bipolar-mode interjection check (server messages only) if not is_dm and globals.BIPOLAR_MODE: try: from utils.persona_dialogue import check_for_interjection current_persona = "evil" if globals.EVIL_MODE else "miku" create_tracked_task( check_for_interjection(response_message, current_persona), task_name="interjection_check", ) except Exception as e: logger.error(f"Error checking for persona interjection: {e}") return response_message # --------------------------------------------------------------------------- # High-level media dispatcher — called from bot.py on_message() # --------------------------------------------------------------------------- async def process_media_in_message(message, prompt, is_dm, guild_id) -> bool: """ Inspect *message* for image/video/GIF attachments and embeds. If any media is found and successfully processed, a reply is sent to the channel and this function returns ``True``. Otherwise it returns ``False`` so the caller can fall through to text-only handling. """ author_id = str(message.author.id) author_name = message.author.display_name # ---- 1. Image attachments (.jpg, .jpeg, .png, .webp) ----------------- if message.attachments: for attachment in message.attachments: lower = attachment.filename.lower() if any(lower.endswith(ext) for ext in (".jpg", ".jpeg", ".png", ".webp")): base64_img = await download_and_encode_image(attachment.url) if not base64_img: await message.channel.send("I couldn't load the image, sorry!") return True qwen_description = await analyze_image_with_vision(base64_img, user_prompt=prompt) if not qwen_description or not qwen_description.strip(): await message.channel.send( "I couldn't see that image clearly, sorry! Try sending it again." ) return True miku_reply = await rephrase_as_miku( qwen_description, prompt, guild_id=guild_id, user_id=author_id, author_name=author_name, media_type="image", ) await _send_log_bipolar(message, miku_reply, is_dm, media_label="image") return True # ---- 2. Video / GIF attachments (.gif, .mp4, .webm, .mov) ---- elif any(lower.endswith(ext) for ext in (".gif", ".mp4", ".webm", ".mov")): is_gif = lower.endswith(".gif") media_type = "gif" if is_gif else "video" logger.debug(f"🎬 Processing {media_type}: {attachment.filename}") media_bytes_b64 = await download_and_encode_media(attachment.url) if not media_bytes_b64: await message.channel.send(f"I couldn't load the {media_type}, sorry!") return True media_bytes = base64.b64decode(media_bytes_b64) if is_gif: logger.debug("🔄 Converting GIF to MP4 for processing...") mp4_bytes = await convert_gif_to_mp4(media_bytes) if mp4_bytes: media_bytes = mp4_bytes logger.info("✅ GIF converted to MP4") else: logger.warning("GIF conversion failed, trying direct processing") frames = await extract_video_frames(media_bytes, num_frames=6) if not frames: await message.channel.send( f"I couldn't extract frames from that {media_type}, sorry!" ) return True logger.debug( f"📹 Extracted {len(frames)} frames from {attachment.filename}" ) video_description = await analyze_video_with_vision( frames, media_type=media_type, user_prompt=prompt, ) if not video_description or not video_description.strip(): await message.channel.send( f"I couldn't analyze that {media_type} clearly, sorry! " "Try sending it again." ) return True miku_reply = await rephrase_as_miku( video_description, prompt, guild_id=guild_id, user_id=author_id, author_name=author_name, media_type=media_type, ) await _send_log_bipolar(message, miku_reply, is_dm, media_label=media_type) return True # ---- 3. Tenor GIF embeds (gifv from tenor.com) ----------------------- if message.embeds: for embed in message.embeds: if embed.type == "gifv" and embed.url and "tenor.com" in embed.url: logger.info(f"🎭 Processing Tenor GIF from embed: {embed.url}") gif_url = await extract_tenor_gif_url(embed.url) if not gif_url: if hasattr(embed, "video") and embed.video: gif_url = embed.video.url elif hasattr(embed, "thumbnail") and embed.thumbnail: gif_url = embed.thumbnail.url if not gif_url: logger.warning("Could not extract GIF URL from Tenor embed") continue media_bytes_b64 = await download_and_encode_media(gif_url) if not media_bytes_b64: await message.channel.send( "I couldn't load that Tenor GIF, sorry!" ) return True media_bytes = base64.b64decode(media_bytes_b64) logger.debug("Converting Tenor GIF to MP4 for processing...") mp4_bytes = await convert_gif_to_mp4(media_bytes) if not mp4_bytes: logger.warning( "GIF conversion failed, trying direct frame extraction" ) mp4_bytes = media_bytes else: logger.debug("Tenor GIF converted to MP4") frames = await extract_video_frames(mp4_bytes, num_frames=6) if not frames: await message.channel.send( "I couldn't extract frames from that GIF, sorry!" ) return True logger.info( f"📹 Extracted {len(frames)} frames from Tenor GIF" ) video_description = await analyze_video_with_vision( frames, media_type="tenor_gif", user_prompt=prompt, ) if not video_description or not video_description.strip(): await message.channel.send( "I couldn't analyze that GIF clearly, sorry! " "Try sending it again." ) return True miku_reply = await rephrase_as_miku( video_description, prompt, guild_id=guild_id, user_id=author_id, author_name=author_name, media_type="tenor_gif", ) await _send_log_bipolar( message, miku_reply, is_dm, media_label="Tenor GIF", ) return True # ---- 4. Rich / article / image / video / link embeds --------- elif embed.type in ("rich", "article", "image", "video", "link"): logger.info(f"Processing {embed.type} embed") embed_content = await extract_embed_content(embed) if not embed_content["has_content"]: logger.warning("Embed has no extractable content, skipping") continue embed_context_parts = [] if embed_content["text"]: truncated = embed_content["text"][:500] if len(embed_content["text"]) > 500: truncated += "..." embed_context_parts.append( f"[Embedded content: {truncated}]" ) # Analyze images found inside the embed for img_url in embed_content["images"]: logger.info(f"Processing image from embed: {img_url}") try: base64_img = await download_and_encode_image(img_url) if base64_img: logger.info( "Image downloaded, analyzing with vision model..." ) qwen_description = await analyze_image_with_vision( base64_img, user_prompt=prompt, ) if qwen_description and qwen_description.strip(): embed_context_parts.append( f"[Embedded image shows: {qwen_description}]" ) else: logger.error("Failed to download image from embed") except Exception as e: logger.error(f"Error processing embedded image: {e}") # Analyze videos found inside the embed for video_url in embed_content["videos"]: logger.info( f"🎬 Processing video from embed: {video_url}" ) try: media_bytes_b64 = await download_and_encode_media( video_url, ) if media_bytes_b64: media_bytes = base64.b64decode(media_bytes_b64) frames = await extract_video_frames( media_bytes, num_frames=6, ) if frames: logger.info( f"📹 Extracted {len(frames)} frames, " "analyzing with vision model..." ) video_description = ( await analyze_video_with_vision( frames, media_type="video", user_prompt=prompt, ) ) if ( video_description and video_description.strip() ): embed_context_parts.append( f"[Embedded video shows: " f"{video_description}]" ) else: logger.error( "Failed to extract frames from video" ) else: logger.error( "Failed to download video from embed" ) except Exception as e: logger.error( f"Error processing embedded video: {e}" ) if not embed_context_parts: continue # Build a combined vision description and route through # rephrase_as_miku (which handles Cat → LLM fallback, # mood resolution, and LAST_CAT_INTERACTION tracking). combined_description = "\n".join(embed_context_parts) miku_reply = await rephrase_as_miku( combined_description, prompt, guild_id=guild_id, user_id=author_id, author_name=author_name, media_type="rich_embed", ) await _send_log_bipolar( message, miku_reply, is_dm, media_label="embed", ) return True return False async def extract_embed_content(embed): """ Extract text and media content from a Discord embed. Returns a dictionary with: - 'text': combined text from title, description, fields - 'images': list of image URLs - 'videos': list of video URLs - 'has_content': boolean indicating if there's any content """ content = { 'text': '', 'images': [], 'videos': [], 'has_content': False } text_parts = [] # Extract text content if embed.title: text_parts.append(f"**{embed.title}**") if embed.description: text_parts.append(embed.description) if embed.author and embed.author.name: text_parts.append(f"Author: {embed.author.name}") if embed.fields: for field in embed.fields: text_parts.append(f"**{field.name}**: {field.value}") if embed.footer and embed.footer.text: text_parts.append(f"_{embed.footer.text}_") # Combine text content['text'] = '\n\n'.join(text_parts) # Extract image URLs if embed.image and embed.image.url: content['images'].append(embed.image.url) if embed.thumbnail and embed.thumbnail.url: content['images'].append(embed.thumbnail.url) # Extract video URLs if embed.video and embed.video.url: content['videos'].append(embed.video.url) # Check if we have any content content['has_content'] = bool(content['text'] or content['images'] or content['videos']) return content