2025-12-07 17:15:09 +02:00
# utils/image_handling.py
import aiohttp
import base64
import io
import tempfile
import os
import subprocess
from PIL import Image
import re
import globals
# No need for switch_model anymore - llama-swap handles this automatically
async def download_and_encode_image ( url ) :
""" Download and encode an image to base64. """
async with aiohttp . ClientSession ( ) as session :
async with session . get ( url ) as resp :
if resp . status != 200 :
return None
img_bytes = await resp . read ( )
return base64 . b64encode ( img_bytes ) . decode ( ' utf-8 ' )
async def download_and_encode_media ( url ) :
""" Download and encode any media file (image, video, GIF) to base64. """
async with aiohttp . ClientSession ( ) as session :
async with session . get ( url ) as resp :
if resp . status != 200 :
return None
media_bytes = await resp . read ( )
return base64 . b64encode ( media_bytes ) . decode ( ' utf-8 ' )
async def extract_tenor_gif_url ( tenor_url ) :
"""
Extract the actual GIF URL from a Tenor link .
Tenor URLs look like : https : / / tenor . com / view / . . .
We need to get the actual GIF file URL from the page or API .
"""
try :
# Try to extract GIF ID from URL
# Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif
match = re . search ( r ' tenor \ .com/view/[^/]+-( \ d+) ' , tenor_url )
if not match :
match = re . search ( r ' tenor \ .com/( \ d+) \ .gif ' , tenor_url )
if not match :
print ( f " ⚠️ Could not extract Tenor GIF ID from: { tenor_url } " )
return None
gif_id = match . group ( 1 )
# Tenor's direct media URL format (this works without API key)
# Try the media CDN URL directly
media_url = f " https://media.tenor.com/images/ { gif_id } /tenor.gif "
# Verify the URL works
async with aiohttp . ClientSession ( ) as session :
async with session . head ( media_url ) as resp :
if resp . status == 200 :
print ( f " ✅ Found Tenor GIF: { media_url } " )
return media_url
# If that didn't work, try alternative formats
for fmt in [ ' tenor.gif ' , ' raw ' ] :
alt_url = f " https://media.tenor.com/ { gif_id } / { fmt } "
async with aiohttp . ClientSession ( ) as session :
async with session . head ( alt_url ) as resp :
if resp . status == 200 :
print ( f " ✅ Found Tenor GIF (alternative): { alt_url } " )
return alt_url
print ( f " ⚠️ Could not find working Tenor media URL for ID: { gif_id } " )
return None
except Exception as e :
print ( f " ⚠️ Error extracting Tenor GIF URL: { e } " )
return None
async def convert_gif_to_mp4 ( gif_bytes ) :
"""
Convert a GIF to MP4 using ffmpeg for better compatibility with video processing .
Returns the MP4 bytes .
"""
try :
# Write GIF to temp file
with tempfile . NamedTemporaryFile ( delete = False , suffix = ' .gif ' ) as temp_gif :
temp_gif . write ( gif_bytes )
temp_gif_path = temp_gif . name
# Output MP4 path
temp_mp4_path = temp_gif_path . replace ( ' .gif ' , ' .mp4 ' )
try :
# Convert GIF to MP4 with ffmpeg
# -movflags faststart makes it streamable
# -pix_fmt yuv420p ensures compatibility
# -vf scale makes sure dimensions are even (required for yuv420p)
ffmpeg_cmd = [
' ffmpeg ' , ' -i ' , temp_gif_path ,
' -movflags ' , ' faststart ' ,
' -pix_fmt ' , ' yuv420p ' ,
' -vf ' , ' scale=trunc(iw/2)*2:trunc(ih/2)*2 ' ,
' -y ' ,
temp_mp4_path
]
result = subprocess . run ( ffmpeg_cmd , capture_output = True , check = True )
# Read the MP4 file
with open ( temp_mp4_path , ' rb ' ) as f :
mp4_bytes = f . read ( )
print ( f " ✅ Converted GIF to MP4 ( { len ( gif_bytes ) } bytes → { len ( mp4_bytes ) } bytes) " )
return mp4_bytes
finally :
# Clean up temp files
if os . path . exists ( temp_gif_path ) :
os . remove ( temp_gif_path )
if os . path . exists ( temp_mp4_path ) :
os . remove ( temp_mp4_path )
except subprocess . CalledProcessError as e :
print ( f " ⚠️ ffmpeg error converting GIF to MP4: { e . stderr . decode ( ) } " )
return None
except Exception as e :
print ( f " ⚠️ Error converting GIF to MP4: { e } " )
import traceback
traceback . print_exc ( )
return None
async def extract_video_frames ( video_bytes , num_frames = 4 ) :
"""
Extract frames from a video or GIF for analysis .
Returns a list of base64 - encoded frames .
"""
try :
# Try GIF first with PIL
try :
gif = Image . open ( io . BytesIO ( video_bytes ) )
if hasattr ( gif , ' n_frames ' ) :
frames = [ ]
# Calculate step to get evenly distributed frames
total_frames = gif . n_frames
step = max ( 1 , total_frames / / num_frames )
for i in range ( 0 , total_frames , step ) :
if len ( frames ) > = num_frames :
break
gif . seek ( i )
frame = gif . convert ( ' RGB ' )
# Convert to base64
buffer = io . BytesIO ( )
frame . save ( buffer , format = ' JPEG ' )
frame_b64 = base64 . b64encode ( buffer . getvalue ( ) ) . decode ( ' utf-8 ' )
frames . append ( frame_b64 )
if frames :
return frames
except Exception as e :
print ( f " Not a GIF, trying video extraction: { e } " )
# For video files (MP4, WebM, etc.), use ffmpeg
import subprocess
import asyncio
# Write video bytes to temp file
with tempfile . NamedTemporaryFile ( delete = False , suffix = ' .mp4 ' ) as temp_video :
temp_video . write ( video_bytes )
temp_video_path = temp_video . name
try :
# Get video duration first
probe_cmd = [
' ffprobe ' , ' -v ' , ' error ' ,
' -show_entries ' , ' format=duration ' ,
' -of ' , ' default=noprint_wrappers=1:nokey=1 ' ,
temp_video_path
]
result = subprocess . run ( probe_cmd , capture_output = True , text = True )
duration = float ( result . stdout . strip ( ) )
# Calculate timestamps for evenly distributed frames
timestamps = [ duration * i / num_frames for i in range ( num_frames ) ]
frames = [ ]
for i , timestamp in enumerate ( timestamps ) :
# Extract frame at timestamp
output_path = f " /tmp/frame_ { i } .jpg "
ffmpeg_cmd = [
' ffmpeg ' , ' -ss ' , str ( timestamp ) ,
' -i ' , temp_video_path ,
' -vframes ' , ' 1 ' ,
' -q:v ' , ' 2 ' ,
' -y ' ,
output_path
]
subprocess . run ( ffmpeg_cmd , capture_output = True , check = True )
# Read and encode the frame
with open ( output_path , ' rb ' ) as f :
frame_bytes = f . read ( )
frame_b64 = base64 . b64encode ( frame_bytes ) . decode ( ' utf-8 ' )
frames . append ( frame_b64 )
# Clean up frame file
os . remove ( output_path )
return frames
finally :
# Clean up temp video file
os . remove ( temp_video_path )
except Exception as e :
print ( f " ⚠️ Error extracting frames: { e } " )
import traceback
traceback . print_exc ( )
return None
async def analyze_image_with_vision ( base64_img ) :
"""
Analyze an image using llama . cpp multimodal capabilities .
Uses OpenAI - compatible chat completions API with image_url .
2026-01-09 00:03:59 +02:00
Always uses NVIDIA GPU for vision model .
2025-12-07 17:15:09 +02:00
"""
2026-01-09 00:03:59 +02:00
from utils . llm import get_vision_gpu_url
2025-12-07 17:15:09 +02:00
payload = {
" model " : globals . VISION_MODEL ,
" messages " : [
{
" role " : " user " ,
" content " : [
{
" type " : " text " ,
" text " : " Describe this image in detail. "
} ,
{
" type " : " image_url " ,
" image_url " : {
" url " : f " data:image/jpeg;base64, { base64_img } "
}
}
]
}
] ,
" stream " : False ,
" max_tokens " : 300
}
headers = { " Content-Type " : " application/json " }
async with aiohttp . ClientSession ( ) as session :
try :
2026-01-09 00:03:59 +02:00
vision_url = get_vision_gpu_url ( )
async with session . post ( f " { vision_url } /v1/chat/completions " , json = payload , headers = headers ) as response :
2025-12-07 17:15:09 +02:00
if response . status == 200 :
data = await response . json ( )
return data . get ( " choices " , [ { } ] ) [ 0 ] . get ( " message " , { } ) . get ( " content " , " No description. " )
else :
error_text = await response . text ( )
print ( f " ❌ Vision API error: { response . status } - { error_text } " )
return f " Error analyzing image: { response . status } "
except Exception as e :
print ( f " ⚠️ Error in analyze_image_with_vision: { e } " )
return f " Error analyzing image: { str ( e ) } "
async def analyze_video_with_vision ( video_frames , media_type = " video " ) :
"""
Analyze a video or GIF by analyzing multiple frames .
video_frames : list of base64 - encoded frames
media_type : " video " , " gif " , or " tenor_gif " to customize the analysis prompt
"""
# Customize prompt based on media type
if media_type == " gif " :
prompt_text = " Describe what ' s happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns. "
elif media_type == " tenor_gif " :
prompt_text = " Describe what ' s happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown. "
else : # video
prompt_text = " Describe what ' s happening in this video. Analyze the sequence of frames and describe the action or motion. "
# Build content with multiple images
content = [
{
" type " : " text " ,
" text " : prompt_text
}
]
# Add each frame as an image
for frame in video_frames :
content . append ( {
" type " : " image_url " ,
" image_url " : {
" url " : f " data:image/jpeg;base64, { frame } "
}
} )
payload = {
" model " : globals . VISION_MODEL ,
" messages " : [
{
" role " : " user " ,
" content " : content
}
] ,
" stream " : False ,
" max_tokens " : 400
}
headers = { " Content-Type " : " application/json " }
async with aiohttp . ClientSession ( ) as session :
try :
2026-01-09 00:03:59 +02:00
vision_url = get_vision_gpu_url ( )
async with session . post ( f " { vision_url } /v1/chat/completions " , json = payload , headers = headers ) as response :
2025-12-07 17:15:09 +02:00
if response . status == 200 :
data = await response . json ( )
return data . get ( " choices " , [ { } ] ) [ 0 ] . get ( " message " , { } ) . get ( " content " , " No description. " )
else :
error_text = await response . text ( )
print ( f " ❌ Vision API error: { response . status } - { error_text } " )
return f " Error analyzing video: { response . status } "
except Exception as e :
print ( f " ⚠️ Error in analyze_video_with_vision: { e } " )
return f " Error analyzing video: { str ( e ) } "
async def rephrase_as_miku ( vision_output , user_prompt , guild_id = None , user_id = None , author_name = None , media_type = " image " ) :
"""
Rephrase vision model ' s image analysis as Miku would respond to it.
Args :
vision_output : Description from vision model
user_prompt : User ' s original message
guild_id : Guild ID for server context ( None for DMs )
user_id : User ID for conversation history
author_name : Display name of the user
media_type : Type of media ( " image " , " video " , " gif " , or " tenor_gif " )
"""
from utils . llm import query_llama
# Format the user's message to include vision context with media type
# This will be saved to history automatically by query_llama
if media_type == " gif " :
media_prefix = " Looking at a GIF "
elif media_type == " tenor_gif " :
media_prefix = " Looking at a Tenor GIF "
elif media_type == " video " :
media_prefix = " Looking at a video "
else : # image
media_prefix = " Looking at an image "
if user_prompt :
# Include media type, vision description, and user's text
formatted_prompt = f " [ { media_prefix } : { vision_output } ] { user_prompt } "
else :
# If no text, just the vision description with media type
formatted_prompt = f " [ { media_prefix } : { vision_output } ] "
# Use the standard LLM query with appropriate response type
response_type = " dm_response " if guild_id is None else " server_response "
# Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility
history_user_id = user_id if user_id else " image_analysis "
return await query_llama (
formatted_prompt ,
user_id = history_user_id ,
guild_id = guild_id ,
response_type = response_type ,
author_name = author_name ,
media_type = media_type # Pass media type to Miku's LLM
)
# Backward compatibility aliases
analyze_image_with_qwen = analyze_image_with_vision
async def extract_embed_content ( embed ) :
"""
Extract text and media content from a Discord embed .
Returns a dictionary with :
- ' text ' : combined text from title , description , fields
- ' images ' : list of image URLs
- ' videos ' : list of video URLs
- ' has_content ' : boolean indicating if there ' s any content
"""
content = {
' text ' : ' ' ,
' images ' : [ ] ,
' videos ' : [ ] ,
' has_content ' : False
}
text_parts = [ ]
# Extract text content
if embed . title :
text_parts . append ( f " ** { embed . title } ** " )
if embed . description :
text_parts . append ( embed . description )
if embed . author and embed . author . name :
text_parts . append ( f " Author: { embed . author . name } " )
if embed . fields :
for field in embed . fields :
text_parts . append ( f " ** { field . name } **: { field . value } " )
if embed . footer and embed . footer . text :
text_parts . append ( f " _ { embed . footer . text } _ " )
# Combine text
content [ ' text ' ] = ' \n \n ' . join ( text_parts )
# Extract image URLs
if embed . image and embed . image . url :
content [ ' images ' ] . append ( embed . image . url )
if embed . thumbnail and embed . thumbnail . url :
content [ ' images ' ] . append ( embed . thumbnail . url )
# Extract video URLs
if embed . video and embed . video . url :
content [ ' videos ' ] . append ( embed . video . url )
# Check if we have any content
content [ ' has_content ' ] = bool ( content [ ' text ' ] or content [ ' images ' ] or content [ ' videos ' ] )
return content