Files
miku-discord/bot/utils/figurine_notifier.py

397 lines
15 KiB
Python
Raw Normal View History

2025-12-07 17:15:09 +02:00
import os
import json
import random
from datetime import datetime
from typing import List, Dict, Any, Tuple
import discord
import globals
from utils.twitter_fetcher import fetch_figurine_tweets_latest
from utils.image_handling import analyze_image_with_qwen, download_and_encode_image
from utils.llm import query_ollama
from utils.dm_logger import dm_logger
def convert_to_fxtwitter(url: str) -> str:
"""Convert twitter.com or x.com URLs to fxtwitter.com for better Discord embeds"""
if "twitter.com" in url:
return url.replace("twitter.com", "fxtwitter.com")
elif "x.com" in url:
return url.replace("x.com", "fxtwitter.com")
return url
SUBSCRIBERS_FILE = "memory/figurine_subscribers.json"
SENT_TWEETS_FILE = "memory/figurine_sent_tweets.json"
def _ensure_dir(path: str) -> None:
directory = os.path.dirname(path)
if directory:
os.makedirs(directory, exist_ok=True)
def load_subscribers() -> List[int]:
try:
if os.path.exists(SUBSCRIBERS_FILE):
print(f"📁 Figurines: Loading subscribers from {SUBSCRIBERS_FILE}")
with open(SUBSCRIBERS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
subs = [int(uid) for uid in data.get("subscribers", [])]
print(f"📋 Figurines: Loaded {len(subs)} subscribers")
return subs
except Exception as e:
print(f"⚠️ Failed to load figurine subscribers: {e}")
return []
def save_subscribers(user_ids: List[int]) -> None:
try:
_ensure_dir(SUBSCRIBERS_FILE)
# Save as strings to be JS-safe in the API layer if needed
payload = {"subscribers": [str(uid) for uid in user_ids]}
print(f"💾 Figurines: Saving {len(user_ids)} subscribers to {SUBSCRIBERS_FILE}")
with open(SUBSCRIBERS_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
except Exception as e:
print(f"⚠️ Failed to save figurine subscribers: {e}")
def add_subscriber(user_id: int) -> bool:
print(f" Figurines: Adding subscriber {user_id}")
subscribers = load_subscribers()
if user_id in subscribers:
print(f" Figurines: Subscriber {user_id} already present")
return False
subscribers.append(user_id)
save_subscribers(subscribers)
print(f"✅ Figurines: Subscriber {user_id} added")
return True
def remove_subscriber(user_id: int) -> bool:
print(f"🗑️ Figurines: Removing subscriber {user_id}")
subscribers = load_subscribers()
if user_id not in subscribers:
print(f" Figurines: Subscriber {user_id} was not present")
return False
subscribers = [uid for uid in subscribers if uid != user_id]
save_subscribers(subscribers)
print(f"✅ Figurines: Subscriber {user_id} removed")
return True
def load_sent_tweets() -> List[str]:
try:
if os.path.exists(SENT_TWEETS_FILE):
print(f"📁 Figurines: Loading sent tweets from {SENT_TWEETS_FILE}")
with open(SENT_TWEETS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
urls = data.get("urls", [])
print(f"📋 Figurines: Loaded {len(urls)} sent tweet URLs")
return urls
except Exception as e:
print(f"⚠️ Failed to load figurine sent tweets: {e}")
return []
def save_sent_tweets(urls: List[str]) -> None:
try:
_ensure_dir(SENT_TWEETS_FILE)
print(f"💾 Figurines: Saving {len(urls)} sent tweet URLs to {SENT_TWEETS_FILE}")
with open(SENT_TWEETS_FILE, "w", encoding="utf-8") as f:
json.dump({"urls": urls}, f, indent=2)
except Exception as e:
print(f"⚠️ Failed to save figurine sent tweets: {e}")
async def choose_random_figurine_tweet() -> Dict[str, Any] | None:
"""Fetch figurine tweets from multiple sources, filter out sent, and pick one randomly."""
print("🔎 Figurines: Fetching figurine tweets by Latest across sources…")
tweets = await fetch_figurine_tweets_latest(limit_per_source=10)
if not tweets:
print("📭 No figurine tweets found across sources")
return None
sent_urls = set(load_sent_tweets())
fresh = [t for t in tweets if t.get("url") not in sent_urls]
print(f"🧮 Figurines: {len(tweets)} total, {len(fresh)} fresh after filtering sent")
if not fresh:
print(" All figurine tweets have been sent before; allowing reuse")
fresh = tweets
chosen = random.choice(fresh)
print(f"🎯 Chosen figurine tweet: {chosen.get('url')}")
return chosen
async def send_figurine_dm_to_user(client: discord.Client, user_id: int, tweet: Dict[str, Any]) -> Tuple[bool, str]:
"""Send the figurine tweet to a single subscriber via DM, with analysis and LLM commentary."""
try:
print(f"✉️ Figurines: Preparing DM to user {user_id}")
user = client.get_user(user_id)
if user is None:
# Try fetching
user = await client.fetch_user(user_id)
if user is None:
return False, f"User {user_id} not found"
# Build base prompt with figurine/merch context
base_prompt = (
"You are Hatsune Miku writing a short, cute, excited DM to a fan about a newly posted "
"figurine or merch announcement tweet. Be friendly and enthusiastic but concise. "
"Reference what the tweet shows."
)
# Analyze the first image if available
if tweet.get("media"):
first_url = tweet["media"][0]
base64_img = await download_and_encode_image(first_url)
if base64_img:
try:
img_desc = await analyze_image_with_qwen(base64_img)
base_prompt += f"\n\nImage looks like: {img_desc}"
except Exception as e:
print(f"⚠️ Image analysis failed: {e}")
# Include tweet text too
tweet_text = tweet.get("text", "").strip()
if tweet_text:
base_prompt += f"\n\nTweet text: {tweet_text}"
base_prompt += "\n\nSign off as Miku with a cute emoji."
# Query LLM in DM context (no guild_id -> DM mood rules apply)
miku_comment = await query_ollama(base_prompt, user_id=f"figurine_dm_{user_id}", guild_id=None, response_type="dm_response")
dm = await user.create_dm()
tweet_url = tweet.get("url", "")
# Send the tweet URL first (convert to fxtwitter for better embeds)
fx_tweet_url = convert_to_fxtwitter(tweet_url)
tweet_message = await dm.send(fx_tweet_url)
print(f"✅ Figurines: Tweet URL sent to {user_id}: {fx_tweet_url}")
# Log the tweet URL message
dm_logger.log_user_message(user, tweet_message, is_bot_message=True)
# Send Miku's comment
comment_message = await dm.send(miku_comment)
print(f"✅ Figurines: Miku comment sent to {user_id}")
# Log the comment message
dm_logger.log_user_message(user, comment_message, is_bot_message=True)
# IMPORTANT: Also add to globals.conversation_history for LLM context
user_id_str = str(user_id)
# Add the tweet URL as a "system message" about what Miku just sent (use original URL for context)
tweet_context = f"[I just sent you this figurine tweet: {tweet_url}]"
# Add the figurine comment to conversation history
# Use empty user prompt since this was initiated by Miku
globals.conversation_history.setdefault(user_id_str, []).append((tweet_context, miku_comment))
print(f"📝 Figurines: Messages logged to both DM history and conversation context for user {user_id}")
return True, "ok"
except Exception as e:
print(f"❌ Figurines: Failed DM to {user_id}: {e}")
return False, f"{e}"
async def send_figurine_dm_to_single_user(client: discord.Client, user_id: int, tweet_url: str = None) -> Dict[str, Any]:
"""Send a figurine tweet to a single user, either from search or specific URL."""
print(f"🎯 Figurines: Sending DM to single user {user_id}")
if tweet_url:
# Use specific tweet URL
print(f"📎 Figurines: Using specific tweet URL: {tweet_url}")
tweet = await fetch_specific_tweet_by_url(tweet_url)
if not tweet:
return {"status": "error", "message": "Failed to fetch specified tweet"}
else:
# Search for a random tweet
print("🔎 Figurines: Searching for random figurine tweet")
tweet = await choose_random_figurine_tweet()
if not tweet:
return {"status": "error", "message": "No figurine tweets found"}
# Send to the single user
ok, msg = await send_figurine_dm_to_user(client, user_id, tweet)
if ok:
# Record as sent if successful
sent_urls = load_sent_tweets()
url = tweet.get("url")
if url and url not in sent_urls:
sent_urls.append(url)
if len(sent_urls) > 200:
sent_urls = sent_urls[-200:]
save_sent_tweets(sent_urls)
result = {
"status": "ok",
"sent": [str(user_id)],
"failed": [],
"tweet": {"url": tweet.get("url", ""), "username": tweet.get("username", "")}
}
print(f"✅ Figurines: Single user DM sent successfully → {result}")
return result
else:
result = {
"status": "error",
"sent": [],
"failed": [{"user_id": str(user_id), "error": msg}],
"message": f"Failed to send DM: {msg}"
}
print(f"❌ Figurines: Single user DM failed → {result}")
return result
async def fetch_specific_tweet_by_url(tweet_url: str) -> Dict[str, Any] | None:
"""Fetch a specific tweet by URL for manual figurine notifications."""
try:
print(f"🔗 Figurines: Fetching specific tweet from URL: {tweet_url}")
# Extract tweet ID from URL
tweet_id = None
if "/status/" in tweet_url:
try:
tweet_id = tweet_url.split("/status/")[1].split("?")[0].split("/")[0]
print(f"📋 Figurines: Extracted tweet ID: {tweet_id}")
except Exception as e:
print(f"❌ Figurines: Failed to extract tweet ID from URL: {e}")
return None
if not tweet_id:
print("❌ Figurines: Could not extract tweet ID from URL")
return None
# Set up twscrape API (same pattern as existing functions)
from twscrape import API
from pathlib import Path
import json
COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json"
# Load cookies
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
cookie_list = json.load(f)
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
# Set up API
api = API()
await api.pool.add_account(
username="HSankyuu39",
password="x",
email="x",
email_password="x",
cookies=cookie_header
)
await api.pool.login_all()
# Try to fetch the tweet using search instead of tweet_details
# Search for the specific tweet ID should return it if accessible
print(f"🔍 Figurines: Searching for tweet with ID {tweet_id}")
search_results = []
try:
# Search using the tweet ID - this should find the specific tweet
from twscrape import gather
search_results = await gather(api.search(f"{tweet_id}", limit=1))
print(f"🔍 Figurines: Search returned {len(search_results)} results")
except Exception as search_error:
print(f"⚠️ Figurines: Search failed: {search_error}")
return None
# Check if we found the tweet
tweet_data = None
for tweet in search_results:
if str(tweet.id) == str(tweet_id):
tweet_data = tweet
print(f"✅ Figurines: Found matching tweet with ID {tweet.id}")
break
if not tweet_data and search_results:
# If no exact match but we have results, use the first one
tweet_data = search_results[0]
print(f"🔍 Figurines: Using first search result with ID {tweet_data.id}")
if tweet_data:
# Extract data using the same pattern as the working search code
username = tweet_data.user.username if hasattr(tweet_data, 'user') and tweet_data.user else "unknown"
text_content = tweet_data.rawContent if hasattr(tweet_data, 'rawContent') else ""
print(f"🔍 Figurines: Found tweet from @{username}")
print(f"🔍 Figurines: Tweet text: {text_content[:100]}...")
# For media, we'll need to extract it from the tweet_url using the same method as other functions
# But for now, let's see if we can get basic tweet data working first
result = {
"text": text_content,
"username": username,
"url": tweet_url,
"media": [] # We'll add media extraction later
}
print(f"✅ Figurines: Successfully fetched tweet from @{result['username']}")
return result
else:
print("❌ Figurines: No tweet found with the specified ID")
return None
except Exception as e:
print(f"❌ Figurines: Error fetching tweet by URL: {e}")
return None
async def send_figurine_dm_to_all_subscribers(client: discord.Client, tweet_url: str = None) -> Dict[str, Any]:
"""Pick a figurine tweet and DM it to all subscribers, recording the sent URL."""
print("🚀 Figurines: Sending figurine DM to all subscribers…")
subscribers = load_subscribers()
if not subscribers:
print(" Figurines: No subscribers configured")
return {"status": "no_subscribers"}
if tweet_url:
# Use specific tweet URL
print(f"📎 Figurines: Using specific tweet URL for all subscribers: {tweet_url}")
tweet = await fetch_specific_tweet_by_url(tweet_url)
if not tweet:
print(" Figurines: Failed to fetch specified tweet")
return {"status": "no_tweet", "message": "Failed to fetch specified tweet"}
else:
# Search for random tweet
tweet = await choose_random_figurine_tweet()
if tweet is None:
print(" Figurines: No tweet to send")
return {"status": "no_tweet"}
results = {"sent": [], "failed": []}
for uid in subscribers:
ok, msg = await send_figurine_dm_to_user(client, uid, tweet)
if ok:
results["sent"].append(str(uid))
else:
print(f"⚠️ Failed to DM user {uid}: {msg}")
results["failed"].append({"user_id": str(uid), "error": msg})
# Record as sent if at least one success to avoid repeats
sent_urls = load_sent_tweets()
url = tweet.get("url")
if url and url not in sent_urls:
sent_urls.append(url)
# keep file from growing unbounded
if len(sent_urls) > 200:
sent_urls = sent_urls[-200:]
save_sent_tweets(sent_urls)
summary = {"status": "ok", **results, "tweet": {"url": tweet.get("url", ""), "username": tweet.get("username", "")}}
print(f"📦 Figurines: DM send complete → {summary}")
return summary