- Create process_media_in_message() in utils/image_handling.py that handles all 4 media types: image attachments, video/GIF attachments, Tenor GIF embeds, and rich embeds - DRY the send→log→bipolar tail pattern (5x repeated) into _send_log_bipolar() helper - Unify rich/article/link embed handling to use rephrase_as_miku() instead of inline Cat→LLM routing, fixing a mood-resolution bug (was using globals.DM_MOOD for servers) - Add 'rich_embed' media_type to rephrase_as_miku() prefix switch - Remove 3 inline 'import base64' from bot.py (already module-level in image_handling.py) - bot.py: 986 → 623 lines (-363) - image_handling.py: 559 → 881 lines (+322) - All 170 tests pass (21 config/state + 149 route split)
882 lines
35 KiB
Python
882 lines
35 KiB
Python
# utils/image_handling.py
|
|
|
|
import aiohttp
|
|
import base64
|
|
import io
|
|
import tempfile
|
|
import os
|
|
import subprocess
|
|
from PIL import Image
|
|
import re
|
|
|
|
import globals
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger('vision')
|
|
|
|
# No need for switch_model anymore - llama-swap handles this automatically
|
|
|
|
|
|
def _extract_vision_question(prompt: str):
|
|
"""
|
|
Strip Discord mentions and bot-name triggers from the user's message to
|
|
produce a clean question suitable for passing directly to the vision model.
|
|
|
|
Returns the cleaned question string, or None if nothing meaningful remains
|
|
(e.g. the message was just "@Miku" or "miku," with no actual question).
|
|
"""
|
|
if not prompt:
|
|
return None
|
|
# Remove Discord user/role mentions: <@123456789>, <@!123456789>
|
|
text = re.sub(r'<@[!&]?\d+>', '', prompt).strip()
|
|
# Strip common bot-name invocation prefixes at the very start (case-insensitive)
|
|
# e.g. "miku,", "hey miku,", "miku!", "Miku: "
|
|
text = re.sub(r'^(?:hey\s+)?miku[,!:\s]+', '', text, flags=re.IGNORECASE).strip()
|
|
# Drop any residual leading punctuation/whitespace
|
|
text = text.lstrip(',.!? ')
|
|
return text if text else None
|
|
|
|
|
|
async def download_and_encode_image(url):
|
|
"""Download and encode an image to base64."""
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
img_bytes = await resp.read()
|
|
return base64.b64encode(img_bytes).decode('utf-8')
|
|
|
|
|
|
async def download_and_encode_media(url):
|
|
"""Download and encode any media file (image, video, GIF) to base64."""
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
media_bytes = await resp.read()
|
|
return base64.b64encode(media_bytes).decode('utf-8')
|
|
|
|
|
|
async def extract_tenor_gif_url(tenor_url):
|
|
"""
|
|
Extract the actual GIF URL from a Tenor link.
|
|
Tenor URLs look like: https://tenor.com/view/...
|
|
We need to get the actual GIF file URL from the page or API.
|
|
"""
|
|
try:
|
|
# Try to extract GIF ID from URL
|
|
# Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif
|
|
match = re.search(r'tenor\.com/view/[^/]+-(\d+)', tenor_url)
|
|
if not match:
|
|
match = re.search(r'tenor\.com/(\d+)\.gif', tenor_url)
|
|
|
|
if not match:
|
|
logger.warning(f"Could not extract Tenor GIF ID from: {tenor_url}")
|
|
return None
|
|
|
|
gif_id = match.group(1)
|
|
|
|
# Tenor's direct media URL format (this works without API key)
|
|
# Try the media CDN URL directly
|
|
media_url = f"https://media.tenor.com/images/{gif_id}/tenor.gif"
|
|
|
|
# Verify the URL works
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.head(media_url) as resp:
|
|
if resp.status == 200:
|
|
logger.debug(f"Found Tenor GIF: {media_url}")
|
|
return media_url
|
|
|
|
# If that didn't work, try alternative formats
|
|
for fmt in ['tenor.gif', 'raw']:
|
|
alt_url = f"https://media.tenor.com/{gif_id}/{fmt}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.head(alt_url) as resp:
|
|
if resp.status == 200:
|
|
logger.debug(f"Found Tenor GIF (alternative): {alt_url}")
|
|
return alt_url
|
|
|
|
logger.warning(f"Could not find working Tenor media URL for ID: {gif_id}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting Tenor GIF URL: {e}")
|
|
return None
|
|
|
|
|
|
async def convert_gif_to_mp4(gif_bytes):
|
|
"""
|
|
Convert a GIF to MP4 using ffmpeg for better compatibility with video processing.
|
|
Returns the MP4 bytes.
|
|
"""
|
|
try:
|
|
# Write GIF to temp file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.gif') as temp_gif:
|
|
temp_gif.write(gif_bytes)
|
|
temp_gif_path = temp_gif.name
|
|
|
|
# Output MP4 path
|
|
temp_mp4_path = temp_gif_path.replace('.gif', '.mp4')
|
|
|
|
try:
|
|
# Convert GIF to MP4 with ffmpeg
|
|
# -movflags faststart makes it streamable
|
|
# -pix_fmt yuv420p ensures compatibility
|
|
# -vf scale makes sure dimensions are even (required for yuv420p)
|
|
ffmpeg_cmd = [
|
|
'ffmpeg', '-i', temp_gif_path,
|
|
'-movflags', 'faststart',
|
|
'-pix_fmt', 'yuv420p',
|
|
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
|
|
'-y',
|
|
temp_mp4_path
|
|
]
|
|
|
|
result = subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
|
|
|
|
# Read the MP4 file
|
|
with open(temp_mp4_path, 'rb') as f:
|
|
mp4_bytes = f.read()
|
|
|
|
logger.info(f"Converted GIF to MP4 ({len(gif_bytes)} bytes → {len(mp4_bytes)} bytes)")
|
|
return mp4_bytes
|
|
|
|
finally:
|
|
# Clean up temp files
|
|
if os.path.exists(temp_gif_path):
|
|
os.remove(temp_gif_path)
|
|
if os.path.exists(temp_mp4_path):
|
|
os.remove(temp_mp4_path)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"ffmpeg error converting GIF to MP4: {e.stderr.decode()}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error converting GIF to MP4: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
|
|
async def extract_video_frames(video_bytes, num_frames=4):
|
|
"""
|
|
Extract frames from a video or GIF for analysis.
|
|
Returns a list of base64-encoded frames.
|
|
"""
|
|
try:
|
|
# Try GIF first with PIL
|
|
try:
|
|
gif = Image.open(io.BytesIO(video_bytes))
|
|
if hasattr(gif, 'n_frames'):
|
|
frames = []
|
|
|
|
# Calculate step to get evenly distributed frames
|
|
total_frames = gif.n_frames
|
|
step = max(1, total_frames // num_frames)
|
|
|
|
for i in range(0, total_frames, step):
|
|
if len(frames) >= num_frames:
|
|
break
|
|
gif.seek(i)
|
|
frame = gif.convert('RGB')
|
|
|
|
# Convert to base64
|
|
buffer = io.BytesIO()
|
|
frame.save(buffer, format='JPEG')
|
|
frame_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
frames.append(frame_b64)
|
|
|
|
if frames:
|
|
return frames
|
|
except Exception as e:
|
|
logger.debug(f"Not a GIF, trying video extraction: {e}")
|
|
|
|
# For video files (MP4, WebM, etc.), use ffmpeg
|
|
import subprocess
|
|
import asyncio
|
|
|
|
# Write video bytes to temp file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video:
|
|
temp_video.write(video_bytes)
|
|
temp_video_path = temp_video.name
|
|
|
|
try:
|
|
# Get video duration first
|
|
probe_cmd = [
|
|
'ffprobe', '-v', 'error',
|
|
'-show_entries', 'format=duration',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1',
|
|
temp_video_path
|
|
]
|
|
|
|
result = subprocess.run(probe_cmd, capture_output=True, text=True)
|
|
duration = float(result.stdout.strip())
|
|
|
|
# Calculate timestamps for evenly distributed frames
|
|
timestamps = [duration * i / num_frames for i in range(num_frames)]
|
|
|
|
frames = []
|
|
for i, timestamp in enumerate(timestamps):
|
|
# Extract frame at timestamp
|
|
output_path = f"/tmp/frame_{i}.jpg"
|
|
ffmpeg_cmd = [
|
|
'ffmpeg', '-ss', str(timestamp),
|
|
'-i', temp_video_path,
|
|
'-vframes', '1',
|
|
'-q:v', '2',
|
|
'-y',
|
|
output_path
|
|
]
|
|
|
|
subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
|
|
|
|
# Read and encode the frame
|
|
with open(output_path, 'rb') as f:
|
|
frame_bytes = f.read()
|
|
frame_b64 = base64.b64encode(frame_bytes).decode('utf-8')
|
|
frames.append(frame_b64)
|
|
|
|
# Clean up frame file
|
|
os.remove(output_path)
|
|
|
|
return frames
|
|
|
|
finally:
|
|
# Clean up temp video file
|
|
os.remove(temp_video_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting frames: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
return None
|
|
|
|
|
|
async def analyze_image_with_vision(base64_img, user_prompt=None):
|
|
"""
|
|
Analyze an image using llama.cpp multimodal capabilities.
|
|
Uses OpenAI-compatible chat completions API with image_url.
|
|
Always uses NVIDIA GPU for vision model.
|
|
|
|
If user_prompt is provided (and contains a meaningful question after stripping
|
|
mentions/triggers), that question is sent to the vision model instead of the
|
|
generic "Describe this image in detail." prompt.
|
|
"""
|
|
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
|
|
|
|
# Check if vision endpoint is healthy before attempting request
|
|
is_healthy, error = await check_vision_endpoint_health()
|
|
if not is_healthy:
|
|
logger.warning(f"Vision endpoint unhealthy: {error}")
|
|
return f"Vision service currently unavailable: {error}"
|
|
|
|
question = _extract_vision_question(user_prompt)
|
|
vision_prompt_text = question if question else "Describe this image in detail."
|
|
logger.info(f"Vision prompt for image: {vision_prompt_text!r}")
|
|
|
|
payload = {
|
|
"model": globals.VISION_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": vision_prompt_text
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_img}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"stream": False,
|
|
"max_tokens": 800
|
|
}
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
vision_url = get_vision_gpu_url()
|
|
logger.info(f"Sending vision request to {vision_url} using model: {globals.VISION_MODEL}")
|
|
|
|
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
|
|
logger.info(f"Vision analysis completed successfully")
|
|
return result
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
|
|
return f"Error analyzing image: {response.status}"
|
|
except Exception as e:
|
|
logger.error(f"Error in analyze_image_with_vision: {e}", exc_info=True)
|
|
return f"Error analyzing image: {str(e)}"
|
|
|
|
|
|
async def analyze_video_with_vision(video_frames, media_type="video", user_prompt=None):
|
|
"""
|
|
Analyze a video or GIF by analyzing multiple frames.
|
|
video_frames: list of base64-encoded frames
|
|
media_type: "video", "gif", or "tenor_gif" to customize the analysis prompt
|
|
user_prompt: optional raw user message; the vision model will be asked to answer
|
|
the specific question instead of giving a generic description.
|
|
"""
|
|
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
|
|
|
|
# Check if vision endpoint is healthy before attempting request
|
|
is_healthy, error = await check_vision_endpoint_health()
|
|
if not is_healthy:
|
|
logger.warning(f"Vision endpoint unhealthy: {error}")
|
|
return f"Vision service currently unavailable: {error}"
|
|
|
|
# Customize prompt based on media type, overridden by user question if present
|
|
question = _extract_vision_question(user_prompt)
|
|
if question:
|
|
prompt_text = question
|
|
logger.info(f"Vision prompt for {media_type}: {prompt_text!r}")
|
|
elif media_type == "gif":
|
|
prompt_text = "Describe what's happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns."
|
|
elif media_type == "tenor_gif":
|
|
prompt_text = "Describe what's happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown."
|
|
else: # video
|
|
prompt_text = "Describe what's happening in this video. Analyze the sequence of frames and describe the action or motion."
|
|
|
|
# Build content with multiple images
|
|
content = [
|
|
{
|
|
"type": "text",
|
|
"text": prompt_text
|
|
}
|
|
]
|
|
|
|
# Add each frame as an image
|
|
for frame in video_frames:
|
|
content.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{frame}"
|
|
}
|
|
})
|
|
|
|
payload = {
|
|
"model": globals.VISION_MODEL,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": content
|
|
}
|
|
],
|
|
"stream": False,
|
|
"max_tokens": 1000
|
|
}
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
vision_url = get_vision_gpu_url()
|
|
logger.info(f"Sending video analysis request to {vision_url} using model: {globals.VISION_MODEL} (media_type: {media_type}, frames: {len(video_frames)})")
|
|
|
|
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
|
|
logger.info(f"Video analysis completed successfully")
|
|
return result
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
|
|
return f"Error analyzing video: {response.status}"
|
|
except Exception as e:
|
|
logger.error(f"Error in analyze_video_with_vision: {e}", exc_info=True)
|
|
return f"Error analyzing video: {str(e)}"
|
|
|
|
|
|
async def rephrase_as_miku(vision_output, user_prompt, guild_id=None, user_id=None, author_name=None, media_type="image"):
|
|
"""
|
|
Rephrase vision model's image analysis as Miku would respond to it.
|
|
|
|
Routes through Cheshire Cat pipeline for memory-augmented responses,
|
|
falling back to direct query_llama() if Cat is unavailable.
|
|
|
|
Args:
|
|
vision_output: Description from vision model
|
|
user_prompt: User's original message
|
|
guild_id: Guild ID for server context (None for DMs)
|
|
user_id: User ID for conversation history
|
|
author_name: Display name of the user
|
|
media_type: Type of media ("image", "video", "gif", or "tenor_gif")
|
|
"""
|
|
from utils.llm import query_llama
|
|
|
|
# Format the user's message to include vision context with media type
|
|
# This will be saved to history automatically by query_llama
|
|
_MEDIA_PREFIXES = {
|
|
"gif": "Looking at a GIF",
|
|
"tenor_gif": "Looking at a Tenor GIF",
|
|
"video": "Looking at a video",
|
|
"rich_embed": "Looking at embedded content",
|
|
}
|
|
media_prefix = _MEDIA_PREFIXES.get(media_type, "Looking at an image")
|
|
|
|
if user_prompt:
|
|
# Include media type, vision description, and user's text
|
|
formatted_prompt = f"[{media_prefix}: {vision_output}] {user_prompt}"
|
|
else:
|
|
# If no text, just the vision description with media type
|
|
formatted_prompt = f"[{media_prefix}: {vision_output}]"
|
|
|
|
# Use the standard LLM query with appropriate response type
|
|
response_type = "dm_response" if guild_id is None else "server_response"
|
|
|
|
# Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility
|
|
history_user_id = user_id if user_id else "image_analysis"
|
|
|
|
# Determine current mood for Cat pipeline
|
|
current_mood = globals.DM_MOOD
|
|
if guild_id:
|
|
try:
|
|
from server_manager import server_manager
|
|
sc = server_manager.get_server_config(guild_id)
|
|
if sc:
|
|
current_mood = sc.current_mood_name
|
|
except Exception:
|
|
pass
|
|
|
|
# Phase 3: Try Cheshire Cat pipeline first (memory-augmented response)
|
|
# This allows image interactions to be stored in episodic memory and
|
|
# benefit from declarative memory recall, just like text messages.
|
|
response = None
|
|
if globals.USE_CHESHIRE_CAT:
|
|
try:
|
|
from utils.cat_client import cat_adapter
|
|
cat_result = await cat_adapter.query(
|
|
text=formatted_prompt,
|
|
user_id=history_user_id,
|
|
guild_id=str(guild_id) if guild_id else None,
|
|
author_name=author_name,
|
|
mood=current_mood,
|
|
response_type=response_type,
|
|
media_type=media_type,
|
|
)
|
|
if cat_result:
|
|
response, cat_full_prompt = cat_result
|
|
effective_mood = current_mood
|
|
if globals.EVIL_MODE:
|
|
effective_mood = f"EVIL:{getattr(globals, 'EVIL_DM_MOOD', 'evil_neutral')}"
|
|
logger.info(f"🐱 Cat {media_type} response for {author_name} (mood: {effective_mood})")
|
|
# Track Cat interaction for Web UI Last Prompt view
|
|
import datetime
|
|
globals.LAST_CAT_INTERACTION = {
|
|
"full_prompt": cat_full_prompt,
|
|
"response": response[:500] if response else "",
|
|
"user": author_name or history_user_id,
|
|
"mood": effective_mood,
|
|
"timestamp": datetime.datetime.now().isoformat(),
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"🐱 Cat {media_type} pipeline error, falling back to query_llama: {e}")
|
|
response = None
|
|
|
|
# Fallback to direct LLM query if Cat didn't respond
|
|
if not response:
|
|
response = await query_llama(
|
|
formatted_prompt,
|
|
user_id=history_user_id,
|
|
guild_id=guild_id,
|
|
response_type=response_type,
|
|
author_name=author_name,
|
|
media_type=media_type # Pass media type to Miku's LLM
|
|
)
|
|
|
|
return response
|
|
|
|
# Backward compatibility aliases
|
|
analyze_image_with_qwen = analyze_image_with_vision
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared tail helper — send response, log DM, check bipolar interjection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _send_log_bipolar(message, reply_text, is_dm, *, media_label=""):
|
|
"""
|
|
Common tail shared by every media handler *and* the text-fallback path in
|
|
bot.py. Sends *reply_text* to the channel, logs the reply in the DM
|
|
ledger when appropriate, and fires a bipolar-interjection check for server
|
|
messages.
|
|
|
|
Returns the sent ``discord.Message`` so callers can use it if needed.
|
|
"""
|
|
from utils.dm_logger import dm_logger
|
|
from utils.task_tracker import create_tracked_task
|
|
|
|
label = f" {media_label}" if media_label else ""
|
|
if is_dm:
|
|
logger.info(
|
|
f"💌 DM{label} response to {message.author.display_name} "
|
|
f"(using DM mood: {globals.DM_MOOD})"
|
|
)
|
|
else:
|
|
guild_name = message.guild.name if message.guild else "unknown"
|
|
logger.info(
|
|
f"💬 Server{label} response to {message.author.display_name} "
|
|
f"in {guild_name} (using server mood)"
|
|
)
|
|
|
|
response_message = await message.channel.send(reply_text)
|
|
|
|
# Log bot's reply in the DM ledger
|
|
if is_dm:
|
|
dm_logger.log_user_message(message.author, response_message, is_bot_message=True)
|
|
|
|
# Bipolar-mode interjection check (server messages only)
|
|
if not is_dm and globals.BIPOLAR_MODE:
|
|
try:
|
|
from utils.persona_dialogue import check_for_interjection
|
|
current_persona = "evil" if globals.EVIL_MODE else "miku"
|
|
create_tracked_task(
|
|
check_for_interjection(response_message, current_persona),
|
|
task_name="interjection_check",
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error checking for persona interjection: {e}")
|
|
|
|
return response_message
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# High-level media dispatcher — called from bot.py on_message()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def process_media_in_message(message, prompt, is_dm, guild_id) -> bool:
|
|
"""
|
|
Inspect *message* for image/video/GIF attachments and embeds.
|
|
|
|
If any media is found and successfully processed, a reply is sent to the
|
|
channel and this function returns ``True``. Otherwise it returns
|
|
``False`` so the caller can fall through to text-only handling.
|
|
"""
|
|
author_id = str(message.author.id)
|
|
author_name = message.author.display_name
|
|
|
|
# ---- 1. Image attachments (.jpg, .jpeg, .png, .webp) -----------------
|
|
if message.attachments:
|
|
for attachment in message.attachments:
|
|
lower = attachment.filename.lower()
|
|
|
|
if any(lower.endswith(ext) for ext in (".jpg", ".jpeg", ".png", ".webp")):
|
|
base64_img = await download_and_encode_image(attachment.url)
|
|
if not base64_img:
|
|
await message.channel.send("I couldn't load the image, sorry!")
|
|
return True
|
|
|
|
qwen_description = await analyze_image_with_vision(base64_img, user_prompt=prompt)
|
|
if not qwen_description or not qwen_description.strip():
|
|
await message.channel.send(
|
|
"I couldn't see that image clearly, sorry! Try sending it again."
|
|
)
|
|
return True
|
|
|
|
miku_reply = await rephrase_as_miku(
|
|
qwen_description, prompt,
|
|
guild_id=guild_id,
|
|
user_id=author_id,
|
|
author_name=author_name,
|
|
media_type="image",
|
|
)
|
|
await _send_log_bipolar(message, miku_reply, is_dm, media_label="image")
|
|
return True
|
|
|
|
# ---- 2. Video / GIF attachments (.gif, .mp4, .webm, .mov) ----
|
|
elif any(lower.endswith(ext) for ext in (".gif", ".mp4", ".webm", ".mov")):
|
|
is_gif = lower.endswith(".gif")
|
|
media_type = "gif" if is_gif else "video"
|
|
|
|
logger.debug(f"🎬 Processing {media_type}: {attachment.filename}")
|
|
|
|
media_bytes_b64 = await download_and_encode_media(attachment.url)
|
|
if not media_bytes_b64:
|
|
await message.channel.send(f"I couldn't load the {media_type}, sorry!")
|
|
return True
|
|
|
|
media_bytes = base64.b64decode(media_bytes_b64)
|
|
|
|
if is_gif:
|
|
logger.debug("🔄 Converting GIF to MP4 for processing...")
|
|
mp4_bytes = await convert_gif_to_mp4(media_bytes)
|
|
if mp4_bytes:
|
|
media_bytes = mp4_bytes
|
|
logger.info("✅ GIF converted to MP4")
|
|
else:
|
|
logger.warning("GIF conversion failed, trying direct processing")
|
|
|
|
frames = await extract_video_frames(media_bytes, num_frames=6)
|
|
if not frames:
|
|
await message.channel.send(
|
|
f"I couldn't extract frames from that {media_type}, sorry!"
|
|
)
|
|
return True
|
|
|
|
logger.debug(
|
|
f"📹 Extracted {len(frames)} frames from {attachment.filename}"
|
|
)
|
|
|
|
video_description = await analyze_video_with_vision(
|
|
frames, media_type=media_type, user_prompt=prompt,
|
|
)
|
|
if not video_description or not video_description.strip():
|
|
await message.channel.send(
|
|
f"I couldn't analyze that {media_type} clearly, sorry! "
|
|
"Try sending it again."
|
|
)
|
|
return True
|
|
|
|
miku_reply = await rephrase_as_miku(
|
|
video_description, prompt,
|
|
guild_id=guild_id,
|
|
user_id=author_id,
|
|
author_name=author_name,
|
|
media_type=media_type,
|
|
)
|
|
await _send_log_bipolar(message, miku_reply, is_dm, media_label=media_type)
|
|
return True
|
|
|
|
# ---- 3. Tenor GIF embeds (gifv from tenor.com) -----------------------
|
|
if message.embeds:
|
|
for embed in message.embeds:
|
|
if embed.type == "gifv" and embed.url and "tenor.com" in embed.url:
|
|
logger.info(f"🎭 Processing Tenor GIF from embed: {embed.url}")
|
|
|
|
gif_url = await extract_tenor_gif_url(embed.url)
|
|
if not gif_url:
|
|
if hasattr(embed, "video") and embed.video:
|
|
gif_url = embed.video.url
|
|
elif hasattr(embed, "thumbnail") and embed.thumbnail:
|
|
gif_url = embed.thumbnail.url
|
|
|
|
if not gif_url:
|
|
logger.warning("Could not extract GIF URL from Tenor embed")
|
|
continue
|
|
|
|
media_bytes_b64 = await download_and_encode_media(gif_url)
|
|
if not media_bytes_b64:
|
|
await message.channel.send(
|
|
"I couldn't load that Tenor GIF, sorry!"
|
|
)
|
|
return True
|
|
|
|
media_bytes = base64.b64decode(media_bytes_b64)
|
|
|
|
logger.debug("Converting Tenor GIF to MP4 for processing...")
|
|
mp4_bytes = await convert_gif_to_mp4(media_bytes)
|
|
if not mp4_bytes:
|
|
logger.warning(
|
|
"GIF conversion failed, trying direct frame extraction"
|
|
)
|
|
mp4_bytes = media_bytes
|
|
else:
|
|
logger.debug("Tenor GIF converted to MP4")
|
|
|
|
frames = await extract_video_frames(mp4_bytes, num_frames=6)
|
|
if not frames:
|
|
await message.channel.send(
|
|
"I couldn't extract frames from that GIF, sorry!"
|
|
)
|
|
return True
|
|
|
|
logger.info(
|
|
f"📹 Extracted {len(frames)} frames from Tenor GIF"
|
|
)
|
|
|
|
video_description = await analyze_video_with_vision(
|
|
frames, media_type="tenor_gif", user_prompt=prompt,
|
|
)
|
|
if not video_description or not video_description.strip():
|
|
await message.channel.send(
|
|
"I couldn't analyze that GIF clearly, sorry! "
|
|
"Try sending it again."
|
|
)
|
|
return True
|
|
|
|
miku_reply = await rephrase_as_miku(
|
|
video_description, prompt,
|
|
guild_id=guild_id,
|
|
user_id=author_id,
|
|
author_name=author_name,
|
|
media_type="tenor_gif",
|
|
)
|
|
await _send_log_bipolar(
|
|
message, miku_reply, is_dm, media_label="Tenor GIF",
|
|
)
|
|
return True
|
|
|
|
# ---- 4. Rich / article / image / video / link embeds ---------
|
|
elif embed.type in ("rich", "article", "image", "video", "link"):
|
|
logger.info(f"Processing {embed.type} embed")
|
|
|
|
embed_content = await extract_embed_content(embed)
|
|
if not embed_content["has_content"]:
|
|
logger.warning("Embed has no extractable content, skipping")
|
|
continue
|
|
|
|
embed_context_parts = []
|
|
if embed_content["text"]:
|
|
truncated = embed_content["text"][:500]
|
|
if len(embed_content["text"]) > 500:
|
|
truncated += "..."
|
|
embed_context_parts.append(
|
|
f"[Embedded content: {truncated}]"
|
|
)
|
|
|
|
# Analyze images found inside the embed
|
|
for img_url in embed_content["images"]:
|
|
logger.info(f"Processing image from embed: {img_url}")
|
|
try:
|
|
base64_img = await download_and_encode_image(img_url)
|
|
if base64_img:
|
|
logger.info(
|
|
"Image downloaded, analyzing with vision model..."
|
|
)
|
|
qwen_description = await analyze_image_with_vision(
|
|
base64_img, user_prompt=prompt,
|
|
)
|
|
if qwen_description and qwen_description.strip():
|
|
embed_context_parts.append(
|
|
f"[Embedded image shows: {qwen_description}]"
|
|
)
|
|
else:
|
|
logger.error("Failed to download image from embed")
|
|
except Exception as e:
|
|
logger.error(f"Error processing embedded image: {e}")
|
|
|
|
# Analyze videos found inside the embed
|
|
for video_url in embed_content["videos"]:
|
|
logger.info(
|
|
f"🎬 Processing video from embed: {video_url}"
|
|
)
|
|
try:
|
|
media_bytes_b64 = await download_and_encode_media(
|
|
video_url,
|
|
)
|
|
if media_bytes_b64:
|
|
media_bytes = base64.b64decode(media_bytes_b64)
|
|
frames = await extract_video_frames(
|
|
media_bytes, num_frames=6,
|
|
)
|
|
if frames:
|
|
logger.info(
|
|
f"📹 Extracted {len(frames)} frames, "
|
|
"analyzing with vision model..."
|
|
)
|
|
video_description = (
|
|
await analyze_video_with_vision(
|
|
frames,
|
|
media_type="video",
|
|
user_prompt=prompt,
|
|
)
|
|
)
|
|
if (
|
|
video_description
|
|
and video_description.strip()
|
|
):
|
|
embed_context_parts.append(
|
|
f"[Embedded video shows: "
|
|
f"{video_description}]"
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Failed to extract frames from video"
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Failed to download video from embed"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing embedded video: {e}"
|
|
)
|
|
|
|
if not embed_context_parts:
|
|
continue
|
|
|
|
# Build a combined vision description and route through
|
|
# rephrase_as_miku (which handles Cat → LLM fallback,
|
|
# mood resolution, and LAST_CAT_INTERACTION tracking).
|
|
combined_description = "\n".join(embed_context_parts)
|
|
miku_reply = await rephrase_as_miku(
|
|
combined_description, prompt,
|
|
guild_id=guild_id,
|
|
user_id=author_id,
|
|
author_name=author_name,
|
|
media_type="rich_embed",
|
|
)
|
|
await _send_log_bipolar(
|
|
message, miku_reply, is_dm, media_label="embed",
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def extract_embed_content(embed):
|
|
"""
|
|
Extract text and media content from a Discord embed.
|
|
Returns a dictionary with:
|
|
- 'text': combined text from title, description, fields
|
|
- 'images': list of image URLs
|
|
- 'videos': list of video URLs
|
|
- 'has_content': boolean indicating if there's any content
|
|
"""
|
|
content = {
|
|
'text': '',
|
|
'images': [],
|
|
'videos': [],
|
|
'has_content': False
|
|
}
|
|
|
|
text_parts = []
|
|
|
|
# Extract text content
|
|
if embed.title:
|
|
text_parts.append(f"**{embed.title}**")
|
|
|
|
if embed.description:
|
|
text_parts.append(embed.description)
|
|
|
|
if embed.author and embed.author.name:
|
|
text_parts.append(f"Author: {embed.author.name}")
|
|
|
|
if embed.fields:
|
|
for field in embed.fields:
|
|
text_parts.append(f"**{field.name}**: {field.value}")
|
|
|
|
if embed.footer and embed.footer.text:
|
|
text_parts.append(f"_{embed.footer.text}_")
|
|
|
|
# Combine text
|
|
content['text'] = '\n\n'.join(text_parts)
|
|
|
|
# Extract image URLs
|
|
if embed.image and embed.image.url:
|
|
content['images'].append(embed.image.url)
|
|
|
|
if embed.thumbnail and embed.thumbnail.url:
|
|
content['images'].append(embed.thumbnail.url)
|
|
|
|
# Extract video URLs
|
|
if embed.video and embed.video.url:
|
|
content['videos'].append(embed.video.url)
|
|
|
|
# Check if we have any content
|
|
content['has_content'] = bool(content['text'] or content['images'] or content['videos'])
|
|
|
|
return content
|