# utils/image_handling.py import aiohttp import base64 import io import tempfile import os import subprocess from PIL import Image import re import globals # No need for switch_model anymore - llama-swap handles this automatically async def download_and_encode_image(url): """Download and encode an image to base64.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: return None img_bytes = await resp.read() return base64.b64encode(img_bytes).decode('utf-8') async def download_and_encode_media(url): """Download and encode any media file (image, video, GIF) to base64.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: return None media_bytes = await resp.read() return base64.b64encode(media_bytes).decode('utf-8') async def extract_tenor_gif_url(tenor_url): """ Extract the actual GIF URL from a Tenor link. Tenor URLs look like: https://tenor.com/view/... We need to get the actual GIF file URL from the page or API. """ try: # Try to extract GIF ID from URL # Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif match = re.search(r'tenor\.com/view/[^/]+-(\d+)', tenor_url) if not match: match = re.search(r'tenor\.com/(\d+)\.gif', tenor_url) if not match: print(f"⚠️ Could not extract Tenor GIF ID from: {tenor_url}") return None gif_id = match.group(1) # Tenor's direct media URL format (this works without API key) # Try the media CDN URL directly media_url = f"https://media.tenor.com/images/{gif_id}/tenor.gif" # Verify the URL works async with aiohttp.ClientSession() as session: async with session.head(media_url) as resp: if resp.status == 200: print(f"✅ Found Tenor GIF: {media_url}") return media_url # If that didn't work, try alternative formats for fmt in ['tenor.gif', 'raw']: alt_url = f"https://media.tenor.com/{gif_id}/{fmt}" async with aiohttp.ClientSession() as session: async with session.head(alt_url) as resp: if resp.status == 200: print(f"✅ Found Tenor GIF (alternative): {alt_url}") return alt_url print(f"⚠️ Could not find working Tenor media URL for ID: {gif_id}") return None except Exception as e: print(f"⚠️ Error extracting Tenor GIF URL: {e}") return None async def convert_gif_to_mp4(gif_bytes): """ Convert a GIF to MP4 using ffmpeg for better compatibility with video processing. Returns the MP4 bytes. """ try: # Write GIF to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.gif') as temp_gif: temp_gif.write(gif_bytes) temp_gif_path = temp_gif.name # Output MP4 path temp_mp4_path = temp_gif_path.replace('.gif', '.mp4') try: # Convert GIF to MP4 with ffmpeg # -movflags faststart makes it streamable # -pix_fmt yuv420p ensures compatibility # -vf scale makes sure dimensions are even (required for yuv420p) ffmpeg_cmd = [ 'ffmpeg', '-i', temp_gif_path, '-movflags', 'faststart', '-pix_fmt', 'yuv420p', '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', '-y', temp_mp4_path ] result = subprocess.run(ffmpeg_cmd, capture_output=True, check=True) # Read the MP4 file with open(temp_mp4_path, 'rb') as f: mp4_bytes = f.read() print(f"✅ Converted GIF to MP4 ({len(gif_bytes)} bytes → {len(mp4_bytes)} bytes)") return mp4_bytes finally: # Clean up temp files if os.path.exists(temp_gif_path): os.remove(temp_gif_path) if os.path.exists(temp_mp4_path): os.remove(temp_mp4_path) except subprocess.CalledProcessError as e: print(f"⚠️ ffmpeg error converting GIF to MP4: {e.stderr.decode()}") return None except Exception as e: print(f"⚠️ Error converting GIF to MP4: {e}") import traceback traceback.print_exc() return None async def extract_video_frames(video_bytes, num_frames=4): """ Extract frames from a video or GIF for analysis. Returns a list of base64-encoded frames. """ try: # Try GIF first with PIL try: gif = Image.open(io.BytesIO(video_bytes)) if hasattr(gif, 'n_frames'): frames = [] # Calculate step to get evenly distributed frames total_frames = gif.n_frames step = max(1, total_frames // num_frames) for i in range(0, total_frames, step): if len(frames) >= num_frames: break gif.seek(i) frame = gif.convert('RGB') # Convert to base64 buffer = io.BytesIO() frame.save(buffer, format='JPEG') frame_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') frames.append(frame_b64) if frames: return frames except Exception as e: print(f"Not a GIF, trying video extraction: {e}") # For video files (MP4, WebM, etc.), use ffmpeg import subprocess import asyncio # Write video bytes to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video: temp_video.write(video_bytes) temp_video_path = temp_video.name try: # Get video duration first probe_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', temp_video_path ] result = subprocess.run(probe_cmd, capture_output=True, text=True) duration = float(result.stdout.strip()) # Calculate timestamps for evenly distributed frames timestamps = [duration * i / num_frames for i in range(num_frames)] frames = [] for i, timestamp in enumerate(timestamps): # Extract frame at timestamp output_path = f"/tmp/frame_{i}.jpg" ffmpeg_cmd = [ 'ffmpeg', '-ss', str(timestamp), '-i', temp_video_path, '-vframes', '1', '-q:v', '2', '-y', output_path ] subprocess.run(ffmpeg_cmd, capture_output=True, check=True) # Read and encode the frame with open(output_path, 'rb') as f: frame_bytes = f.read() frame_b64 = base64.b64encode(frame_bytes).decode('utf-8') frames.append(frame_b64) # Clean up frame file os.remove(output_path) return frames finally: # Clean up temp video file os.remove(temp_video_path) except Exception as e: print(f"⚠️ Error extracting frames: {e}") import traceback traceback.print_exc() return None async def analyze_image_with_vision(base64_img): """ Analyze an image using llama.cpp multimodal capabilities. Uses OpenAI-compatible chat completions API with image_url. """ payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in detail." }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_img}" } } ] } ], "stream": False, "max_tokens": 300 } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: try: async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers) as response: if response.status == 200: data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "No description.") else: error_text = await response.text() print(f"❌ Vision API error: {response.status} - {error_text}") return f"Error analyzing image: {response.status}" except Exception as e: print(f"⚠️ Error in analyze_image_with_vision: {e}") return f"Error analyzing image: {str(e)}" async def analyze_video_with_vision(video_frames, media_type="video"): """ Analyze a video or GIF by analyzing multiple frames. video_frames: list of base64-encoded frames media_type: "video", "gif", or "tenor_gif" to customize the analysis prompt """ # Customize prompt based on media type if media_type == "gif": prompt_text = "Describe what's happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns." elif media_type == "tenor_gif": prompt_text = "Describe what's happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown." else: # video prompt_text = "Describe what's happening in this video. Analyze the sequence of frames and describe the action or motion." # Build content with multiple images content = [ { "type": "text", "text": prompt_text } ] # Add each frame as an image for frame in video_frames: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{frame}" } }) payload = { "model": globals.VISION_MODEL, "messages": [ { "role": "user", "content": content } ], "stream": False, "max_tokens": 400 } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: try: async with session.post(f"{globals.LLAMA_URL}/v1/chat/completions", json=payload, headers=headers) as response: if response.status == 200: data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "No description.") else: error_text = await response.text() print(f"❌ Vision API error: {response.status} - {error_text}") return f"Error analyzing video: {response.status}" except Exception as e: print(f"⚠️ Error in analyze_video_with_vision: {e}") return f"Error analyzing video: {str(e)}" async def rephrase_as_miku(vision_output, user_prompt, guild_id=None, user_id=None, author_name=None, media_type="image"): """ Rephrase vision model's image analysis as Miku would respond to it. Args: vision_output: Description from vision model user_prompt: User's original message guild_id: Guild ID for server context (None for DMs) user_id: User ID for conversation history author_name: Display name of the user media_type: Type of media ("image", "video", "gif", or "tenor_gif") """ from utils.llm import query_llama # Format the user's message to include vision context with media type # This will be saved to history automatically by query_llama if media_type == "gif": media_prefix = "Looking at a GIF" elif media_type == "tenor_gif": media_prefix = "Looking at a Tenor GIF" elif media_type == "video": media_prefix = "Looking at a video" else: # image media_prefix = "Looking at an image" if user_prompt: # Include media type, vision description, and user's text formatted_prompt = f"[{media_prefix}: {vision_output}] {user_prompt}" else: # If no text, just the vision description with media type formatted_prompt = f"[{media_prefix}: {vision_output}]" # Use the standard LLM query with appropriate response type response_type = "dm_response" if guild_id is None else "server_response" # Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility history_user_id = user_id if user_id else "image_analysis" return await query_llama( formatted_prompt, user_id=history_user_id, guild_id=guild_id, response_type=response_type, author_name=author_name, media_type=media_type # Pass media type to Miku's LLM ) # Backward compatibility aliases analyze_image_with_qwen = analyze_image_with_vision async def extract_embed_content(embed): """ Extract text and media content from a Discord embed. Returns a dictionary with: - 'text': combined text from title, description, fields - 'images': list of image URLs - 'videos': list of video URLs - 'has_content': boolean indicating if there's any content """ content = { 'text': '', 'images': [], 'videos': [], 'has_content': False } text_parts = [] # Extract text content if embed.title: text_parts.append(f"**{embed.title}**") if embed.description: text_parts.append(embed.description) if embed.author and embed.author.name: text_parts.append(f"Author: {embed.author.name}") if embed.fields: for field in embed.fields: text_parts.append(f"**{field.name}**: {field.value}") if embed.footer and embed.footer.text: text_parts.append(f"_{embed.footer.text}_") # Combine text content['text'] = '\n\n'.join(text_parts) # Extract image URLs if embed.image and embed.image.url: content['images'].append(embed.image.url) if embed.thumbnail and embed.thumbnail.url: content['images'].append(embed.thumbnail.url) # Extract video URLs if embed.video and embed.video.url: content['videos'].append(embed.video.url) # Check if we have any content content['has_content'] = bool(content['text'] or content['images'] or content['videos']) return content