import os import json import random from datetime import datetime from typing import List, Dict, Any, Tuple import discord import globals from utils.twitter_fetcher import fetch_figurine_tweets_latest from utils.image_handling import analyze_image_with_qwen, download_and_encode_image from utils.llm import query_llama from utils.dm_logger import dm_logger def convert_to_fxtwitter(url: str) -> str: """Convert twitter.com or x.com URLs to fxtwitter.com for better Discord embeds""" if "twitter.com" in url: return url.replace("twitter.com", "fxtwitter.com") elif "x.com" in url: return url.replace("x.com", "fxtwitter.com") return url SUBSCRIBERS_FILE = "memory/figurine_subscribers.json" SENT_TWEETS_FILE = "memory/figurine_sent_tweets.json" def _ensure_dir(path: str) -> None: directory = os.path.dirname(path) if directory: os.makedirs(directory, exist_ok=True) def load_subscribers() -> List[int]: try: if os.path.exists(SUBSCRIBERS_FILE): print(f"๐Ÿ“ Figurines: Loading subscribers from {SUBSCRIBERS_FILE}") with open(SUBSCRIBERS_FILE, "r", encoding="utf-8") as f: data = json.load(f) subs = [int(uid) for uid in data.get("subscribers", [])] print(f"๐Ÿ“‹ Figurines: Loaded {len(subs)} subscribers") return subs except Exception as e: print(f"โš ๏ธ Failed to load figurine subscribers: {e}") return [] def save_subscribers(user_ids: List[int]) -> None: try: _ensure_dir(SUBSCRIBERS_FILE) # Save as strings to be JS-safe in the API layer if needed payload = {"subscribers": [str(uid) for uid in user_ids]} print(f"๐Ÿ’พ Figurines: Saving {len(user_ids)} subscribers to {SUBSCRIBERS_FILE}") with open(SUBSCRIBERS_FILE, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2) except Exception as e: print(f"โš ๏ธ Failed to save figurine subscribers: {e}") def add_subscriber(user_id: int) -> bool: print(f"โž• Figurines: Adding subscriber {user_id}") subscribers = load_subscribers() if user_id in subscribers: print(f"โ„น๏ธ Figurines: Subscriber {user_id} already present") return False subscribers.append(user_id) save_subscribers(subscribers) print(f"โœ… Figurines: Subscriber {user_id} added") return True def remove_subscriber(user_id: int) -> bool: print(f"๐Ÿ—‘๏ธ Figurines: Removing subscriber {user_id}") subscribers = load_subscribers() if user_id not in subscribers: print(f"โ„น๏ธ Figurines: Subscriber {user_id} was not present") return False subscribers = [uid for uid in subscribers if uid != user_id] save_subscribers(subscribers) print(f"โœ… Figurines: Subscriber {user_id} removed") return True def load_sent_tweets() -> List[str]: try: if os.path.exists(SENT_TWEETS_FILE): print(f"๐Ÿ“ Figurines: Loading sent tweets from {SENT_TWEETS_FILE}") with open(SENT_TWEETS_FILE, "r", encoding="utf-8") as f: data = json.load(f) urls = data.get("urls", []) print(f"๐Ÿ“‹ Figurines: Loaded {len(urls)} sent tweet URLs") return urls except Exception as e: print(f"โš ๏ธ Failed to load figurine sent tweets: {e}") return [] def save_sent_tweets(urls: List[str]) -> None: try: _ensure_dir(SENT_TWEETS_FILE) print(f"๐Ÿ’พ Figurines: Saving {len(urls)} sent tweet URLs to {SENT_TWEETS_FILE}") with open(SENT_TWEETS_FILE, "w", encoding="utf-8") as f: json.dump({"urls": urls}, f, indent=2) except Exception as e: print(f"โš ๏ธ Failed to save figurine sent tweets: {e}") async def choose_random_figurine_tweet() -> Dict[str, Any] | None: """Fetch figurine tweets from multiple sources, filter out sent, and pick one randomly.""" print("๐Ÿ”Ž Figurines: Fetching figurine tweets by Latest across sourcesโ€ฆ") tweets = await fetch_figurine_tweets_latest(limit_per_source=10) if not tweets: print("๐Ÿ“ญ No figurine tweets found across sources") return None sent_urls = set(load_sent_tweets()) fresh = [t for t in tweets if t.get("url") not in sent_urls] print(f"๐Ÿงฎ Figurines: {len(tweets)} total, {len(fresh)} fresh after filtering sent") if not fresh: print("โ„น๏ธ All figurine tweets have been sent before; allowing reuse") fresh = tweets chosen = random.choice(fresh) print(f"๐ŸŽฏ Chosen figurine tweet: {chosen.get('url')}") return chosen async def send_figurine_dm_to_user(client: discord.Client, user_id: int, tweet: Dict[str, Any]) -> Tuple[bool, str]: """Send the figurine tweet to a single subscriber via DM, with analysis and LLM commentary.""" try: print(f"โœ‰๏ธ Figurines: Preparing DM to user {user_id}") user = client.get_user(user_id) if user is None: # Try fetching user = await client.fetch_user(user_id) if user is None: return False, f"User {user_id} not found" # Check if evil mode is active and adjust prompt accordingly from utils.evil_mode import is_evil_mode evil_mode = is_evil_mode() if evil_mode: # Evil Miku's cruel take on figurines base_prompt = ( "You are Evil Miku writing a DM to a fan about a newly posted " "figurine or merch announcement tweet. Be cruel, mocking, and dismissive about the merch. " "You may question why the need for cheap replicas when the real thing is much more appealing (flaunt your breasts as you say that)." "Mock their obsession with plastic dolls and consumer culture. Keep it short and cutting. " "Reference what the tweet shows, but with contempt." ) else: # Normal Miku's excited response base_prompt = ( "You are Hatsune Miku writing a short, cute, excited DM to a fan about a newly posted " "figurine or merch announcement tweet. Be friendly and enthusiastic but concise. " "Reference what the tweet shows." ) # Analyze the first image if available if tweet.get("media"): first_url = tweet["media"][0] base64_img = await download_and_encode_image(first_url) if base64_img: try: img_desc = await analyze_image_with_qwen(base64_img) base_prompt += f"\n\nImage looks like: {img_desc}" except Exception as e: print(f"โš ๏ธ Image analysis failed: {e}") # Include tweet text too tweet_text = tweet.get("text", "").strip() if tweet_text: base_prompt += f"\n\nTweet text: {tweet_text}" if evil_mode: base_prompt += "\n\nSign off as Evil Miku with a dark emoji." else: base_prompt += "\n\nSign off as Miku with a cute emoji." # Query LLM in DM context (no guild_id -> DM mood rules apply) miku_comment = await query_llama(base_prompt, user_id=f"figurine_dm_{user_id}", guild_id=None, response_type="dm_response") dm = await user.create_dm() tweet_url = tweet.get("url", "") # Send the tweet URL first (convert to fxtwitter for better embeds) fx_tweet_url = convert_to_fxtwitter(tweet_url) tweet_message = await dm.send(fx_tweet_url) print(f"โœ… Figurines: Tweet URL sent to {user_id}: {fx_tweet_url}") # Log the tweet URL message dm_logger.log_user_message(user, tweet_message, is_bot_message=True) # Send Miku's comment comment_message = await dm.send(miku_comment) print(f"โœ… Figurines: Miku comment sent to {user_id}") # Log the comment message dm_logger.log_user_message(user, comment_message, is_bot_message=True) # IMPORTANT: Also add to globals.conversation_history for LLM context user_id_str = str(user_id) # Add the tweet URL as a "system message" about what Miku just sent (use original URL for context) tweet_context = f"[I just sent you this figurine tweet: {tweet_url}]" # Add the figurine comment to conversation history # Use empty user prompt since this was initiated by Miku globals.conversation_history.setdefault(user_id_str, []).append((tweet_context, miku_comment)) print(f"๐Ÿ“ Figurines: Messages logged to both DM history and conversation context for user {user_id}") return True, "ok" except Exception as e: print(f"โŒ Figurines: Failed DM to {user_id}: {e}") return False, f"{e}" async def send_figurine_dm_to_single_user(client: discord.Client, user_id: int, tweet_url: str = None) -> Dict[str, Any]: """Send a figurine tweet to a single user, either from search or specific URL.""" print(f"๐ŸŽฏ Figurines: Sending DM to single user {user_id}") if tweet_url: # Use specific tweet URL print(f"๐Ÿ“Ž Figurines: Using specific tweet URL: {tweet_url}") tweet = await fetch_specific_tweet_by_url(tweet_url) if not tweet: return {"status": "error", "message": "Failed to fetch specified tweet"} else: # Search for a random tweet print("๐Ÿ”Ž Figurines: Searching for random figurine tweet") tweet = await choose_random_figurine_tweet() if not tweet: return {"status": "error", "message": "No figurine tweets found"} # Send to the single user ok, msg = await send_figurine_dm_to_user(client, user_id, tweet) if ok: # Record as sent if successful sent_urls = load_sent_tweets() url = tweet.get("url") if url and url not in sent_urls: sent_urls.append(url) if len(sent_urls) > 200: sent_urls = sent_urls[-200:] save_sent_tweets(sent_urls) result = { "status": "ok", "sent": [str(user_id)], "failed": [], "tweet": {"url": tweet.get("url", ""), "username": tweet.get("username", "")} } print(f"โœ… Figurines: Single user DM sent successfully โ†’ {result}") return result else: result = { "status": "error", "sent": [], "failed": [{"user_id": str(user_id), "error": msg}], "message": f"Failed to send DM: {msg}" } print(f"โŒ Figurines: Single user DM failed โ†’ {result}") return result async def fetch_specific_tweet_by_url(tweet_url: str) -> Dict[str, Any] | None: """Fetch a specific tweet by URL for manual figurine notifications.""" try: print(f"๐Ÿ”— Figurines: Fetching specific tweet from URL: {tweet_url}") # Extract tweet ID from URL tweet_id = None if "/status/" in tweet_url: try: tweet_id = tweet_url.split("/status/")[1].split("?")[0].split("/")[0] print(f"๐Ÿ“‹ Figurines: Extracted tweet ID: {tweet_id}") except Exception as e: print(f"โŒ Figurines: Failed to extract tweet ID from URL: {e}") return None if not tweet_id: print("โŒ Figurines: Could not extract tweet ID from URL") return None # Set up twscrape API (same pattern as existing functions) from twscrape import API from pathlib import Path import json COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json" # Load cookies with open(COOKIE_PATH, "r", encoding="utf-8") as f: cookie_list = json.load(f) cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list) # Set up API api = API() await api.pool.add_account( username="HSankyuu39", password="x", email="x", email_password="x", cookies=cookie_header ) await api.pool.login_all() # Try to fetch the tweet using search instead of tweet_details # Search for the specific tweet ID should return it if accessible print(f"๐Ÿ” Figurines: Searching for tweet with ID {tweet_id}") search_results = [] try: # Search using the tweet ID - this should find the specific tweet from twscrape import gather search_results = await gather(api.search(f"{tweet_id}", limit=1)) print(f"๐Ÿ” Figurines: Search returned {len(search_results)} results") except Exception as search_error: print(f"โš ๏ธ Figurines: Search failed: {search_error}") return None # Check if we found the tweet tweet_data = None for tweet in search_results: if str(tweet.id) == str(tweet_id): tweet_data = tweet print(f"โœ… Figurines: Found matching tweet with ID {tweet.id}") break if not tweet_data and search_results: # If no exact match but we have results, use the first one tweet_data = search_results[0] print(f"๐Ÿ” Figurines: Using first search result with ID {tweet_data.id}") if tweet_data: # Extract data using the same pattern as the working search code username = tweet_data.user.username if hasattr(tweet_data, 'user') and tweet_data.user else "unknown" text_content = tweet_data.rawContent if hasattr(tweet_data, 'rawContent') else "" print(f"๐Ÿ” Figurines: Found tweet from @{username}") print(f"๐Ÿ” Figurines: Tweet text: {text_content[:100]}...") # For media, we'll need to extract it from the tweet_url using the same method as other functions # But for now, let's see if we can get basic tweet data working first result = { "text": text_content, "username": username, "url": tweet_url, "media": [] # We'll add media extraction later } print(f"โœ… Figurines: Successfully fetched tweet from @{result['username']}") return result else: print("โŒ Figurines: No tweet found with the specified ID") return None except Exception as e: print(f"โŒ Figurines: Error fetching tweet by URL: {e}") return None async def send_figurine_dm_to_all_subscribers(client: discord.Client, tweet_url: str = None) -> Dict[str, Any]: """Pick a figurine tweet and DM it to all subscribers, recording the sent URL.""" print("๐Ÿš€ Figurines: Sending figurine DM to all subscribersโ€ฆ") subscribers = load_subscribers() if not subscribers: print("โ„น๏ธ Figurines: No subscribers configured") return {"status": "no_subscribers"} if tweet_url: # Use specific tweet URL print(f"๐Ÿ“Ž Figurines: Using specific tweet URL for all subscribers: {tweet_url}") tweet = await fetch_specific_tweet_by_url(tweet_url) if not tweet: print("โ„น๏ธ Figurines: Failed to fetch specified tweet") return {"status": "no_tweet", "message": "Failed to fetch specified tweet"} else: # Search for random tweet tweet = await choose_random_figurine_tweet() if tweet is None: print("โ„น๏ธ Figurines: No tweet to send") return {"status": "no_tweet"} results = {"sent": [], "failed": []} for uid in subscribers: ok, msg = await send_figurine_dm_to_user(client, uid, tweet) if ok: results["sent"].append(str(uid)) else: print(f"โš ๏ธ Failed to DM user {uid}: {msg}") results["failed"].append({"user_id": str(uid), "error": msg}) # Record as sent if at least one success to avoid repeats sent_urls = load_sent_tweets() url = tweet.get("url") if url and url not in sent_urls: sent_urls.append(url) # keep file from growing unbounded if len(sent_urls) > 200: sent_urls = sent_urls[-200:] save_sent_tweets(sent_urls) summary = {"status": "ok", **results, "tweet": {"url": tweet.get("url", ""), "username": tweet.get("username", "")}} print(f"๐Ÿ“ฆ Figurines: DM send complete โ†’ {summary}") return summary