diff --git a/headless_rvc.py b/headless_rvc.py new file mode 100755 index 0000000..2c2f362 --- /dev/null +++ b/headless_rvc.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python3 +""" +Headless RVC Voice Conversion +Uses the RVC GUI logic without the GUI window for headless operation. +Configuration via JSON file or command-line arguments. +""" + +import os +import sys +import json +import argparse +from pathlib import Path + +# Set up environment (same as GUI) +os.environ["OMP_NUM_THREADS"] = "4" +if sys.platform == "darwin": + os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" + +# Store original directory +SCRIPT_DIR = Path(__file__).parent +ORIGINAL_DIR = os.getcwd() + +# Add RVC to path (but don't change directory yet) +rvc_dir = SCRIPT_DIR / "Retrieval-based-Voice-Conversion-WebUI" +sys.path.insert(0, str(rvc_dir)) + +import multiprocessing +from multiprocessing import Queue, cpu_count +import numpy as np +import sounddevice as sd +import torch +import torch.nn.functional as F +import librosa +from tools.torchgate import TorchGate +import torchaudio.transforms as tat + +from infer.lib import rtrvc as rvc_for_realtime +from configs.config import Config + +# Initialize harvest processes (same as GUI) +inp_q = Queue() +opt_q = Queue() +n_cpu = min(cpu_count(), 8) + + +class Harvest(multiprocessing.Process): + def __init__(self, inp_q, opt_q): + multiprocessing.Process.__init__(self) + self.inp_q = inp_q + self.opt_q = opt_q + + def run(self): + import numpy as np + import pyworld + while 1: + idx, x, res_f0, n_cpu, ts = self.inp_q.get() + f0, t = pyworld.harvest( + x.astype(np.double), + fs=16000, + f0_ceil=1100, + f0_floor=50, + frame_period=10, + ) + res_f0[idx] = f0 + if len(res_f0.keys()) >= n_cpu: + self.opt_q.put(ts) + + +# Start harvest processes +for _ in range(n_cpu): + p = Harvest(inp_q, opt_q) + p.daemon = True + p.start() + + +def phase_vocoder(a, b, fade_out, fade_in): + """Phase vocoder for smooth crossfading""" + window = torch.sqrt(fade_out * fade_in) + fa = torch.fft.rfft(a * window) + fb = torch.fft.rfft(b * window) + absab = torch.abs(fa) + torch.abs(fb) + n = a.shape[0] + if n % 2 == 0: + absab[1:-1] *= 2 + else: + absab[1:] *= 2 + phia = torch.angle(fa) + phib = torch.angle(fb) + deltaphase = phib - phia + deltaphase = deltaphase - 2 * np.pi * torch.floor(deltaphase / 2 / np.pi + 0.5) + w = 2 * np.pi * torch.arange(n // 2 + 1).to(a) + deltaphase + t = torch.arange(n).unsqueeze(-1).to(a) / n + result = ( + a * (fade_out**2) + + b * (fade_in**2) + + torch.sum(absab * torch.cos(w * t + phia), -1) * window / n + ) + return result + + +class HeadlessRVCConfig: + """Configuration for headless RVC operation""" + def __init__(self, config_dict=None): + # Model paths + self.pth_path = "" + self.index_path = "" + + # Audio processing parameters + self.pitch = 0 # Pitch shift in semitones + self.formant = 0.0 # Formant shift + self.block_time = 0.25 # Block size in seconds + self.crossfade_time = 0.05 # Crossfade duration + self.extra_time = 2.5 # Extra buffer time + self.threshold = -60 # Voice activity threshold in dB + + # RVC parameters + self.index_rate = 0.0 # Index feature ratio (0-1) + self.rms_mix_rate = 0.0 # Volume envelope mixing (0-1) + self.f0method = "rmvpe" # F0 extraction method + + # Noise reduction + self.I_noise_reduce = False # Input noise reduction + self.O_noise_reduce = False # Output noise reduction + self.use_pv = False # Use phase vocoder + + # Audio device settings + self.input_device = None # Input device name/index + self.output_device = None # Output device name/index + self.samplerate = 48000 # Sample rate + self.channels = 2 # Number of channels + + # Processing + self.n_cpu = min(n_cpu, 4) + + # Apply config dict if provided + if config_dict: + for key, value in config_dict.items(): + if hasattr(self, key): + setattr(self, key, value) + + def save(self, path): + """Save configuration to JSON file""" + config_dict = {k: v for k, v in self.__dict__.items() + if not k.startswith('_')} + with open(path, 'w') as f: + json.dump(config_dict, f, indent=2) + + @classmethod + def load(cls, path): + """Load configuration from JSON file""" + with open(path, 'r') as f: + config_dict = json.load(f) + return cls(config_dict) + + +class HeadlessRVC: + """Headless RVC processor using GUI logic""" + + def __init__(self, config: HeadlessRVCConfig): + self.gui_config = config + + # Save and clear sys.argv to prevent Config from parsing our arguments + saved_argv = sys.argv.copy() + sys.argv = [sys.argv[0]] # Keep only script name + + # Change to RVC directory before initializing Config + saved_cwd = os.getcwd() + os.chdir(str(rvc_dir)) + self.config = Config() + os.chdir(saved_cwd) + + # Restore sys.argv + sys.argv = saved_argv + + self.running = False + self.stream = None + + # Initialize buffers and parameters (same as GUI) + self.function = "vc" + self.rvc = None + self.tgt_sr = None + + def initialize_rvc(self): + """Initialize RVC model (same as GUI's start_vc)""" + # Convert paths to absolute if they're not already + pth_path = Path(self.gui_config.pth_path) + if not pth_path.is_absolute(): + pth_path = (SCRIPT_DIR / pth_path).resolve() + + index_path = Path(self.gui_config.index_path) + if not index_path.is_absolute(): + index_path = (SCRIPT_DIR / index_path).resolve() + + print(f"Loading RVC model: {pth_path}") + print(f"Loading index: {index_path}") + + # Change to RVC directory for model loading + saved_cwd = os.getcwd() + os.chdir(str(rvc_dir)) + + try: + self.rvc = rvc_for_realtime.RVC( + self.gui_config.pitch, + self.gui_config.formant, + str(pth_path), + str(index_path), + self.gui_config.index_rate, + self.gui_config.n_cpu, + inp_q, + opt_q, + self.config, + self.rvc if hasattr(self, "rvc") else None, + ) + finally: + os.chdir(saved_cwd) + + self.tgt_sr = self.rvc.tgt_sr + self.gui_config.samplerate = ( + self.tgt_sr if self.tgt_sr else self.gui_config.samplerate + ) + + # Calculate frame sizes + self.zc = self.gui_config.samplerate // 100 + self.block_frame = int( + np.round(self.gui_config.block_time * self.gui_config.samplerate / self.zc) + * self.zc + ) + self.block_frame_16k = 160 * self.block_frame // self.zc + self.crossfade_frame = int( + np.round(self.gui_config.crossfade_time * self.gui_config.samplerate / self.zc) + * self.zc + ) + self.sola_buffer_frame = min(self.crossfade_frame, 4 * self.zc) + self.sola_search_frame = self.zc + self.extra_frame = int( + np.round(self.gui_config.extra_time * self.gui_config.samplerate / self.zc) + * self.zc + ) + self.input_wav = torch.zeros( + self.extra_frame + + self.crossfade_frame + + self.sola_search_frame + + self.block_frame, + device=self.config.device, + dtype=torch.float32, + ) + self.input_wav_denoise = self.input_wav.clone() + self.input_wav_res = torch.zeros( + 160 * self.input_wav.shape[0] // self.zc, + device=self.config.device, + dtype=torch.float32, + ) + self.rms_buffer = np.zeros(4 * self.zc, dtype="float32") + self.sola_buffer = torch.zeros( + self.sola_buffer_frame, device=self.config.device, dtype=torch.float32 + ) + self.nr_buffer = self.sola_buffer.clone() + self.output_buffer = self.input_wav.clone() + self.skip_head = int(self.extra_frame / self.zc) + self.return_length = ( + self.block_frame + + self.sola_buffer_frame + + self.sola_search_frame + ) // self.zc + self.fade_in_window = ( + torch.sin( + 0.5 + * np.pi + * torch.linspace( + 0.0, + 1.0, + steps=self.sola_buffer_frame, + device=self.config.device, + dtype=torch.float32, + ) + ) + ** 2 + ) + self.fade_out_window = 1 - self.fade_in_window + + # Resampler + self.resampler = tat.Resample( + orig_freq=self.gui_config.samplerate, + new_freq=16000, + lowpass_filter_width=128, + rolloff=0.99, + resampling_method="sinc_interp_kaiser", + beta=14.769656459379492, + dtype=torch.float32, + ).to(self.config.device) + + if self.tgt_sr != self.gui_config.samplerate: + self.resampler2 = tat.Resample( + orig_freq=self.tgt_sr, + new_freq=self.gui_config.samplerate, + lowpass_filter_width=128, + rolloff=0.99, + resampling_method="sinc_interp_kaiser", + beta=14.769656459379492, + dtype=torch.float32, + ).to(self.config.device) + else: + self.resampler2 = None + + # Torchgate for noise reduction + self.tg = TorchGate( + sr=self.gui_config.samplerate, nonstationary=True + ).to(self.config.device) + + print("✓ RVC model initialized successfully") + + def audio_callback(self, indata: np.ndarray, outdata: np.ndarray, frames, times, status): + """ + Audio processing callback (same logic as GUI) + """ + import time + + # Save current directory and switch to RVC directory for model loading + saved_cwd = os.getcwd() + os.chdir(str(rvc_dir)) + + try: + start_time = time.perf_counter() + + indata = librosa.to_mono(indata.T) + + # Threshold processing + if self.gui_config.threshold > -60: + indata = np.append(self.rms_buffer, indata) + rms = librosa.feature.rms( + y=indata, frame_length=4 * self.zc, hop_length=self.zc + )[:, 2:] + self.rms_buffer[:] = indata[-4 * self.zc:] + indata = indata[2 * self.zc - self.zc // 2:] + db_threshold = ( + librosa.amplitude_to_db(rms, ref=1.0)[0] < self.gui_config.threshold + ) + for i in range(db_threshold.shape[0]): + if db_threshold[i]: + indata[i * self.zc: (i + 1) * self.zc] = 0 + indata = indata[self.zc // 2:] + + # Update input buffer + self.input_wav[:-self.block_frame] = self.input_wav[self.block_frame:].clone() + self.input_wav[-indata.shape[0]:] = torch.from_numpy(indata).to(self.config.device) + self.input_wav_res[:-self.block_frame_16k] = self.input_wav_res[self.block_frame_16k:].clone() + + + # Input noise reduction + if self.gui_config.I_noise_reduce: + self.input_wav_denoise[:-self.block_frame] = self.input_wav_denoise[self.block_frame:].clone() + input_wav = self.input_wav[-self.sola_buffer_frame - self.block_frame:] + input_wav = self.tg(input_wav.unsqueeze(0), self.input_wav.unsqueeze(0)).squeeze(0) + input_wav[:self.sola_buffer_frame] *= self.fade_in_window + input_wav[:self.sola_buffer_frame] += self.nr_buffer * self.fade_out_window + self.input_wav_denoise[-self.block_frame:] = input_wav[:self.block_frame] + self.nr_buffer[:] = input_wav[self.block_frame:] + self.input_wav_res[-self.block_frame_16k - 160:] = self.resampler( + self.input_wav_denoise[-self.block_frame - 2 * self.zc:] + )[160:] + else: + self.input_wav_res[-160 * (indata.shape[0] // self.zc + 1):] = ( + self.resampler(self.input_wav[-indata.shape[0] - 2 * self.zc:])[160:] + ) + + # Voice conversion + infer_wav = self.rvc.infer( + self.input_wav_res, + self.block_frame_16k, + self.skip_head, + self.return_length, + self.gui_config.f0method, + ) + + if self.resampler2 is not None: + infer_wav = self.resampler2(infer_wav) + + # Output noise reduction + if self.gui_config.O_noise_reduce: + self.output_buffer[:-self.block_frame] = self.output_buffer[self.block_frame:].clone() + self.output_buffer[-self.block_frame:] = infer_wav[-self.block_frame:] + infer_wav = self.tg(infer_wav.unsqueeze(0), self.output_buffer.unsqueeze(0)).squeeze(0) + + # RMS mixing + if self.gui_config.rms_mix_rate < 1: + input_wav = self.input_wav_denoise[self.extra_frame:] if self.gui_config.I_noise_reduce else self.input_wav[self.extra_frame:] + rms1 = librosa.feature.rms( + y=input_wav[:infer_wav.shape[0]].cpu().numpy(), + frame_length=4 * self.zc, + hop_length=self.zc, + ) + rms1 = torch.from_numpy(rms1).to(self.config.device) + rms1 = F.interpolate( + rms1.unsqueeze(0), + size=infer_wav.shape[0] + 1, + mode="linear", + align_corners=True, + )[0, 0, :-1] + rms2 = librosa.feature.rms( + y=infer_wav[:].cpu().numpy(), + frame_length=4 * self.zc, + hop_length=self.zc, + ) + rms2 = torch.from_numpy(rms2).to(self.config.device) + rms2 = F.interpolate( + rms2.unsqueeze(0), + size=infer_wav.shape[0] + 1, + mode="linear", + align_corners=True, + )[0, 0, :-1] + rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-3) + infer_wav *= torch.pow( + rms1 / rms2, torch.tensor(1 - self.gui_config.rms_mix_rate) + ) + + # SOLA algorithm + conv_input = infer_wav[ + None, None, :self.sola_buffer_frame + self.sola_search_frame + ] + cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :]) + cor_den = torch.sqrt( + F.conv1d( + conv_input**2, + torch.ones(1, 1, self.sola_buffer_frame, device=self.config.device), + ) + + 1e-8 + ) + + if sys.platform == "darwin": + _, sola_offset = torch.max(cor_nom[0, 0] / cor_den[0, 0]) + sola_offset = sola_offset.item() + else: + sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0]) + + infer_wav = infer_wav[sola_offset:] + + if "privateuseone" in str(self.config.device) or not self.gui_config.use_pv: + infer_wav[:self.sola_buffer_frame] *= self.fade_in_window + infer_wav[:self.sola_buffer_frame] += self.sola_buffer * self.fade_out_window + else: + infer_wav[:self.sola_buffer_frame] = phase_vocoder( + self.sola_buffer, + infer_wav[:self.sola_buffer_frame], + self.fade_out_window, + self.fade_in_window, + ) + + self.sola_buffer[:] = infer_wav[ + self.block_frame: self.block_frame + self.sola_buffer_frame + ] + + outdata[:] = ( + infer_wav[:self.block_frame] + .repeat(self.gui_config.channels, 1) + .t() + .cpu() + .numpy() + ) + + total_time = time.perf_counter() - start_time + print(f"Infer time: {total_time:.2f}s") + + finally: + # Restore directory + os.chdir(saved_cwd) + + def start(self): + """Start the audio stream""" + if self.running: + print("Already running") + return + + if self.rvc is None: + self.initialize_rvc() + + print(f"Starting audio stream...") + print(f" Input: {self.gui_config.input_device}") + print(f" Output: {self.gui_config.output_device}") + print(f" Sample rate: {self.gui_config.samplerate}") + + self.running = True + self.stream = sd.Stream( + callback=self.audio_callback, + blocksize=self.block_frame, + samplerate=self.gui_config.samplerate, + channels=self.gui_config.channels, + dtype="float32", + device=(self.gui_config.input_device, self.gui_config.output_device), + ) + self.stream.start() + print("✓ Audio stream started") + + def stop(self): + """Stop the audio stream""" + if not self.running: + return + + self.running = False + if self.stream is not None: + self.stream.abort() + self.stream.close() + self.stream = None + print("✓ Audio stream stopped") + + +def main(): + parser = argparse.ArgumentParser(description="Headless RVC Voice Conversion") + parser.add_argument("--config", type=str, help="Path to JSON configuration file") + parser.add_argument("--pth", type=str, help="Path to RVC model (.pth file)") + parser.add_argument("--index", type=str, help="Path to index file") + parser.add_argument("--pitch", type=int, default=0, help="Pitch shift in semitones") + parser.add_argument("--input-device", type=str, help="Input audio device") + parser.add_argument("--output-device", type=str, help="Output audio device") + parser.add_argument("--f0method", type=str, default="rmvpe", + choices=["pm", "harvest", "crepe", "rmvpe", "fcpe"], + help="F0 extraction method") + parser.add_argument("--index-rate", type=float, default=0.0, help="Index feature ratio (0-1)") + parser.add_argument("--save-config", type=str, help="Save configuration to file") + + args = parser.parse_args() + + # Load or create configuration + if args.config: + config = HeadlessRVCConfig.load(args.config) + print(f"Loaded configuration from: {args.config}") + else: + config = HeadlessRVCConfig() + + # Override with command-line arguments + if args.pth: + config.pth_path = args.pth + if args.index: + config.index_path = args.index + if args.pitch != 0: + config.pitch = args.pitch + if args.input_device: + config.input_device = args.input_device + if args.output_device: + config.output_device = args.output_device + if args.f0method: + config.f0method = args.f0method + if args.index_rate: + config.index_rate = args.index_rate + + # Save configuration if requested + if args.save_config: + config.save(args.save_config) + print(f"Saved configuration to: {args.save_config}") + return + + # Validate required parameters + if not config.pth_path: + print("Error: --pth or --config with pth_path is required") + return + + # Create and start headless RVC + print("=" * 70) + print("Headless RVC Voice Conversion") + print("=" * 70) + + rvc = HeadlessRVC(config) + rvc.start() + + try: + print("\nPress Ctrl+C to stop...") + while True: + sd.sleep(1000) + except KeyboardInterrupt: + print("\n\nStopping...") + finally: + rvc.stop() + print("Goodbye!") + + +if __name__ == "__main__": + main()