#!/usr/bin/env python3 """ Headless RVC Voice Conversion Uses the RVC GUI logic without the GUI window for headless operation. Configuration via JSON file or command-line arguments. """ import os import sys import json import argparse from pathlib import Path # Set up environment (same as GUI) os.environ["OMP_NUM_THREADS"] = "4" if sys.platform == "darwin": os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Store original directory SCRIPT_DIR = Path(__file__).parent ORIGINAL_DIR = os.getcwd() # Add RVC to path (but don't change directory yet) rvc_dir = SCRIPT_DIR / "Retrieval-based-Voice-Conversion-WebUI" sys.path.insert(0, str(rvc_dir)) import multiprocessing from multiprocessing import Queue, cpu_count import numpy as np import sounddevice as sd import torch import torch.nn.functional as F import librosa from tools.torchgate import TorchGate import torchaudio.transforms as tat from infer.lib import rtrvc as rvc_for_realtime from configs.config import Config # Initialize harvest processes (same as GUI) inp_q = Queue() opt_q = Queue() n_cpu = min(cpu_count(), 8) class Harvest(multiprocessing.Process): def __init__(self, inp_q, opt_q): multiprocessing.Process.__init__(self) self.inp_q = inp_q self.opt_q = opt_q def run(self): import numpy as np import pyworld while 1: idx, x, res_f0, n_cpu, ts = self.inp_q.get() f0, t = pyworld.harvest( x.astype(np.double), fs=16000, f0_ceil=1100, f0_floor=50, frame_period=10, ) res_f0[idx] = f0 if len(res_f0.keys()) >= n_cpu: self.opt_q.put(ts) # Start harvest processes for _ in range(n_cpu): p = Harvest(inp_q, opt_q) p.daemon = True p.start() def phase_vocoder(a, b, fade_out, fade_in): """Phase vocoder for smooth crossfading""" window = torch.sqrt(fade_out * fade_in) fa = torch.fft.rfft(a * window) fb = torch.fft.rfft(b * window) absab = torch.abs(fa) + torch.abs(fb) n = a.shape[0] if n % 2 == 0: absab[1:-1] *= 2 else: absab[1:] *= 2 phia = torch.angle(fa) phib = torch.angle(fb) deltaphase = phib - phia deltaphase = deltaphase - 2 * np.pi * torch.floor(deltaphase / 2 / np.pi + 0.5) w = 2 * np.pi * torch.arange(n // 2 + 1).to(a) + deltaphase t = torch.arange(n).unsqueeze(-1).to(a) / n result = ( a * (fade_out**2) + b * (fade_in**2) + torch.sum(absab * torch.cos(w * t + phia), -1) * window / n ) return result class HeadlessRVCConfig: """Configuration for headless RVC operation""" def __init__(self, config_dict=None): # Model paths self.pth_path = "" self.index_path = "" # Audio processing parameters self.pitch = 0 # Pitch shift in semitones self.formant = 0.0 # Formant shift self.block_time = 0.25 # Block size in seconds self.crossfade_time = 0.05 # Crossfade duration self.extra_time = 2.5 # Extra buffer time self.threshold = -60 # Voice activity threshold in dB # RVC parameters self.index_rate = 0.0 # Index feature ratio (0-1) self.rms_mix_rate = 0.0 # Volume envelope mixing (0-1) self.f0method = "rmvpe" # F0 extraction method # Noise reduction self.I_noise_reduce = False # Input noise reduction self.O_noise_reduce = False # Output noise reduction self.use_pv = False # Use phase vocoder # Audio device settings self.input_device = None # Input device name/index self.output_device = None # Output device name/index self.samplerate = 48000 # Sample rate self.channels = 2 # Number of channels # Processing self.n_cpu = min(n_cpu, 4) # Apply config dict if provided if config_dict: for key, value in config_dict.items(): if hasattr(self, key): setattr(self, key, value) def save(self, path): """Save configuration to JSON file""" config_dict = {k: v for k, v in self.__dict__.items() if not k.startswith('_')} with open(path, 'w') as f: json.dump(config_dict, f, indent=2) @classmethod def load(cls, path): """Load configuration from JSON file""" with open(path, 'r') as f: config_dict = json.load(f) return cls(config_dict) class HeadlessRVC: """Headless RVC processor using GUI logic""" def __init__(self, config: HeadlessRVCConfig): self.gui_config = config # Save and clear sys.argv to prevent Config from parsing our arguments saved_argv = sys.argv.copy() sys.argv = [sys.argv[0]] # Keep only script name # Change to RVC directory before initializing Config saved_cwd = os.getcwd() os.chdir(str(rvc_dir)) self.config = Config() os.chdir(saved_cwd) # Restore sys.argv sys.argv = saved_argv self.running = False self.stream = None # Initialize buffers and parameters (same as GUI) self.function = "vc" self.rvc = None self.tgt_sr = None def initialize_rvc(self): """Initialize RVC model (same as GUI's start_vc)""" # Convert paths to absolute if they're not already pth_path = Path(self.gui_config.pth_path) if not pth_path.is_absolute(): pth_path = (SCRIPT_DIR / pth_path).resolve() index_path = Path(self.gui_config.index_path) if not index_path.is_absolute(): index_path = (SCRIPT_DIR / index_path).resolve() print(f"Loading RVC model: {pth_path}") print(f"Loading index: {index_path}") # Change to RVC directory for model loading saved_cwd = os.getcwd() os.chdir(str(rvc_dir)) try: self.rvc = rvc_for_realtime.RVC( self.gui_config.pitch, self.gui_config.formant, str(pth_path), str(index_path), self.gui_config.index_rate, self.gui_config.n_cpu, inp_q, opt_q, self.config, self.rvc if hasattr(self, "rvc") else None, ) finally: os.chdir(saved_cwd) self.tgt_sr = self.rvc.tgt_sr self.gui_config.samplerate = ( self.tgt_sr if self.tgt_sr else self.gui_config.samplerate ) # Calculate frame sizes self.zc = self.gui_config.samplerate // 100 self.block_frame = int( np.round(self.gui_config.block_time * self.gui_config.samplerate / self.zc) * self.zc ) self.block_frame_16k = 160 * self.block_frame // self.zc self.crossfade_frame = int( np.round(self.gui_config.crossfade_time * self.gui_config.samplerate / self.zc) * self.zc ) self.sola_buffer_frame = min(self.crossfade_frame, 4 * self.zc) self.sola_search_frame = self.zc self.extra_frame = int( np.round(self.gui_config.extra_time * self.gui_config.samplerate / self.zc) * self.zc ) self.input_wav = torch.zeros( self.extra_frame + self.crossfade_frame + self.sola_search_frame + self.block_frame, device=self.config.device, dtype=torch.float32, ) self.input_wav_denoise = self.input_wav.clone() self.input_wav_res = torch.zeros( 160 * self.input_wav.shape[0] // self.zc, device=self.config.device, dtype=torch.float32, ) self.rms_buffer = np.zeros(4 * self.zc, dtype="float32") self.sola_buffer = torch.zeros( self.sola_buffer_frame, device=self.config.device, dtype=torch.float32 ) self.nr_buffer = self.sola_buffer.clone() self.output_buffer = self.input_wav.clone() self.skip_head = int(self.extra_frame / self.zc) self.return_length = ( self.block_frame + self.sola_buffer_frame + self.sola_search_frame ) // self.zc self.fade_in_window = ( torch.sin( 0.5 * np.pi * torch.linspace( 0.0, 1.0, steps=self.sola_buffer_frame, device=self.config.device, dtype=torch.float32, ) ) ** 2 ) self.fade_out_window = 1 - self.fade_in_window # Resampler self.resampler = tat.Resample( orig_freq=self.gui_config.samplerate, new_freq=16000, lowpass_filter_width=128, rolloff=0.99, resampling_method="sinc_interp_kaiser", beta=14.769656459379492, dtype=torch.float32, ).to(self.config.device) if self.tgt_sr != self.gui_config.samplerate: self.resampler2 = tat.Resample( orig_freq=self.tgt_sr, new_freq=self.gui_config.samplerate, lowpass_filter_width=128, rolloff=0.99, resampling_method="sinc_interp_kaiser", beta=14.769656459379492, dtype=torch.float32, ).to(self.config.device) else: self.resampler2 = None # Torchgate for noise reduction self.tg = TorchGate( sr=self.gui_config.samplerate, nonstationary=True ).to(self.config.device) print("✓ RVC model initialized successfully") def audio_callback(self, indata: np.ndarray, outdata: np.ndarray, frames, times, status): """ Audio processing callback (same logic as GUI) """ import time # Save current directory and switch to RVC directory for model loading saved_cwd = os.getcwd() os.chdir(str(rvc_dir)) try: start_time = time.perf_counter() indata = librosa.to_mono(indata.T) # Threshold processing if self.gui_config.threshold > -60: indata = np.append(self.rms_buffer, indata) rms = librosa.feature.rms( y=indata, frame_length=4 * self.zc, hop_length=self.zc )[:, 2:] self.rms_buffer[:] = indata[-4 * self.zc:] indata = indata[2 * self.zc - self.zc // 2:] db_threshold = ( librosa.amplitude_to_db(rms, ref=1.0)[0] < self.gui_config.threshold ) for i in range(db_threshold.shape[0]): if db_threshold[i]: indata[i * self.zc: (i + 1) * self.zc] = 0 indata = indata[self.zc // 2:] # Update input buffer self.input_wav[:-self.block_frame] = self.input_wav[self.block_frame:].clone() self.input_wav[-indata.shape[0]:] = torch.from_numpy(indata).to(self.config.device) self.input_wav_res[:-self.block_frame_16k] = self.input_wav_res[self.block_frame_16k:].clone() # Input noise reduction if self.gui_config.I_noise_reduce: self.input_wav_denoise[:-self.block_frame] = self.input_wav_denoise[self.block_frame:].clone() input_wav = self.input_wav[-self.sola_buffer_frame - self.block_frame:] input_wav = self.tg(input_wav.unsqueeze(0), self.input_wav.unsqueeze(0)).squeeze(0) input_wav[:self.sola_buffer_frame] *= self.fade_in_window input_wav[:self.sola_buffer_frame] += self.nr_buffer * self.fade_out_window self.input_wav_denoise[-self.block_frame:] = input_wav[:self.block_frame] self.nr_buffer[:] = input_wav[self.block_frame:] self.input_wav_res[-self.block_frame_16k - 160:] = self.resampler( self.input_wav_denoise[-self.block_frame - 2 * self.zc:] )[160:] else: self.input_wav_res[-160 * (indata.shape[0] // self.zc + 1):] = ( self.resampler(self.input_wav[-indata.shape[0] - 2 * self.zc:])[160:] ) # Voice conversion infer_wav = self.rvc.infer( self.input_wav_res, self.block_frame_16k, self.skip_head, self.return_length, self.gui_config.f0method, ) if self.resampler2 is not None: infer_wav = self.resampler2(infer_wav) # Output noise reduction if self.gui_config.O_noise_reduce: self.output_buffer[:-self.block_frame] = self.output_buffer[self.block_frame:].clone() self.output_buffer[-self.block_frame:] = infer_wav[-self.block_frame:] infer_wav = self.tg(infer_wav.unsqueeze(0), self.output_buffer.unsqueeze(0)).squeeze(0) # RMS mixing if self.gui_config.rms_mix_rate < 1: input_wav = self.input_wav_denoise[self.extra_frame:] if self.gui_config.I_noise_reduce else self.input_wav[self.extra_frame:] rms1 = librosa.feature.rms( y=input_wav[:infer_wav.shape[0]].cpu().numpy(), frame_length=4 * self.zc, hop_length=self.zc, ) rms1 = torch.from_numpy(rms1).to(self.config.device) rms1 = F.interpolate( rms1.unsqueeze(0), size=infer_wav.shape[0] + 1, mode="linear", align_corners=True, )[0, 0, :-1] rms2 = librosa.feature.rms( y=infer_wav[:].cpu().numpy(), frame_length=4 * self.zc, hop_length=self.zc, ) rms2 = torch.from_numpy(rms2).to(self.config.device) rms2 = F.interpolate( rms2.unsqueeze(0), size=infer_wav.shape[0] + 1, mode="linear", align_corners=True, )[0, 0, :-1] rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-3) infer_wav *= torch.pow( rms1 / rms2, torch.tensor(1 - self.gui_config.rms_mix_rate) ) # SOLA algorithm conv_input = infer_wav[ None, None, :self.sola_buffer_frame + self.sola_search_frame ] cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :]) cor_den = torch.sqrt( F.conv1d( conv_input**2, torch.ones(1, 1, self.sola_buffer_frame, device=self.config.device), ) + 1e-8 ) if sys.platform == "darwin": _, sola_offset = torch.max(cor_nom[0, 0] / cor_den[0, 0]) sola_offset = sola_offset.item() else: sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0]) infer_wav = infer_wav[sola_offset:] if "privateuseone" in str(self.config.device) or not self.gui_config.use_pv: infer_wav[:self.sola_buffer_frame] *= self.fade_in_window infer_wav[:self.sola_buffer_frame] += self.sola_buffer * self.fade_out_window else: infer_wav[:self.sola_buffer_frame] = phase_vocoder( self.sola_buffer, infer_wav[:self.sola_buffer_frame], self.fade_out_window, self.fade_in_window, ) self.sola_buffer[:] = infer_wav[ self.block_frame: self.block_frame + self.sola_buffer_frame ] outdata[:] = ( infer_wav[:self.block_frame] .repeat(self.gui_config.channels, 1) .t() .cpu() .numpy() ) total_time = time.perf_counter() - start_time print(f"Infer time: {total_time:.2f}s") finally: # Restore directory os.chdir(saved_cwd) def start(self): """Start the audio stream""" if self.running: print("Already running") return if self.rvc is None: self.initialize_rvc() print(f"Starting audio stream...") print(f" Input: {self.gui_config.input_device}") print(f" Output: {self.gui_config.output_device}") print(f" Sample rate: {self.gui_config.samplerate}") self.running = True self.stream = sd.Stream( callback=self.audio_callback, blocksize=self.block_frame, samplerate=self.gui_config.samplerate, channels=self.gui_config.channels, dtype="float32", device=(self.gui_config.input_device, self.gui_config.output_device), ) self.stream.start() print("✓ Audio stream started") def stop(self): """Stop the audio stream""" if not self.running: return self.running = False if self.stream is not None: self.stream.abort() self.stream.close() self.stream = None print("✓ Audio stream stopped") def main(): parser = argparse.ArgumentParser(description="Headless RVC Voice Conversion") parser.add_argument("--config", type=str, help="Path to JSON configuration file") parser.add_argument("--pth", type=str, help="Path to RVC model (.pth file)") parser.add_argument("--index", type=str, help="Path to index file") parser.add_argument("--pitch", type=int, default=0, help="Pitch shift in semitones") parser.add_argument("--input-device", type=str, help="Input audio device") parser.add_argument("--output-device", type=str, help="Output audio device") parser.add_argument("--f0method", type=str, default="rmvpe", choices=["pm", "harvest", "crepe", "rmvpe", "fcpe"], help="F0 extraction method") parser.add_argument("--index-rate", type=float, default=0.0, help="Index feature ratio (0-1)") parser.add_argument("--save-config", type=str, help="Save configuration to file") args = parser.parse_args() # Load or create configuration if args.config: config = HeadlessRVCConfig.load(args.config) print(f"Loaded configuration from: {args.config}") else: config = HeadlessRVCConfig() # Override with command-line arguments if args.pth: config.pth_path = args.pth if args.index: config.index_path = args.index if args.pitch != 0: config.pitch = args.pitch if args.input_device: config.input_device = args.input_device if args.output_device: config.output_device = args.output_device if args.f0method: config.f0method = args.f0method if args.index_rate: config.index_rate = args.index_rate # Save configuration if requested if args.save_config: config.save(args.save_config) print(f"Saved configuration to: {args.save_config}") return # Validate required parameters if not config.pth_path: print("Error: --pth or --config with pth_path is required") return # Create and start headless RVC print("=" * 70) print("Headless RVC Voice Conversion") print("=" * 70) rvc = HeadlessRVC(config) rvc.start() try: print("\nPress Ctrl+C to stop...") while True: sd.sleep(1000) except KeyboardInterrupt: print("\n\nStopping...") finally: rvc.stop() print("Goodbye!") if __name__ == "__main__": main()