Files
soprano_to_rvc/headless_rvc.py

613 lines
21 KiB
Python
Raw Normal View History

2026-01-12 23:14:56 +02:00
#!/usr/bin/env python3
"""
Headless RVC Voice Conversion
Uses the RVC GUI logic without the GUI window for headless operation.
Configuration via JSON file or command-line arguments.
"""
import os
import sys
import json
import argparse
2026-01-13 00:20:55 +02:00
import logging
import atexit
2026-01-12 23:14:56 +02:00
from pathlib import Path
2026-01-13 00:20:55 +02:00
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)
2026-01-12 23:14:56 +02:00
# Set up environment (same as GUI)
os.environ["OMP_NUM_THREADS"] = "4"
if sys.platform == "darwin":
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
# Store original directory
SCRIPT_DIR = Path(__file__).parent
ORIGINAL_DIR = os.getcwd()
# Add RVC to path (but don't change directory yet)
rvc_dir = SCRIPT_DIR / "Retrieval-based-Voice-Conversion-WebUI"
sys.path.insert(0, str(rvc_dir))
import multiprocessing
from multiprocessing import Queue, cpu_count
import numpy as np
import sounddevice as sd
import torch
import torch.nn.functional as F
import librosa
from tools.torchgate import TorchGate
import torchaudio.transforms as tat
from infer.lib import rtrvc as rvc_for_realtime
from configs.config import Config
# Initialize harvest processes (same as GUI)
inp_q = Queue()
opt_q = Queue()
n_cpu = min(cpu_count(), 8)
2026-01-13 00:20:55 +02:00
harvest_processes = [] # Keep track of processes for cleanup
2026-01-12 23:14:56 +02:00
class Harvest(multiprocessing.Process):
def __init__(self, inp_q, opt_q):
multiprocessing.Process.__init__(self)
self.inp_q = inp_q
self.opt_q = opt_q
def run(self):
import numpy as np
import pyworld
while 1:
idx, x, res_f0, n_cpu, ts = self.inp_q.get()
f0, t = pyworld.harvest(
x.astype(np.double),
fs=16000,
f0_ceil=1100,
f0_floor=50,
frame_period=10,
)
res_f0[idx] = f0
if len(res_f0.keys()) >= n_cpu:
self.opt_q.put(ts)
# Start harvest processes
for _ in range(n_cpu):
p = Harvest(inp_q, opt_q)
p.daemon = True
p.start()
2026-01-13 00:20:55 +02:00
harvest_processes.append(p)
def cleanup_harvest_processes():
"""Terminate all harvest processes gracefully"""
global harvest_processes
for p in harvest_processes:
if p.is_alive():
p.terminate()
# Wait briefly for processes to terminate
for p in harvest_processes:
p.join(timeout=0.1)
harvest_processes.clear()
# Register cleanup to run on exit
atexit.register(cleanup_harvest_processes)
2026-01-12 23:14:56 +02:00
def phase_vocoder(a, b, fade_out, fade_in):
"""Phase vocoder for smooth crossfading"""
window = torch.sqrt(fade_out * fade_in)
fa = torch.fft.rfft(a * window)
fb = torch.fft.rfft(b * window)
absab = torch.abs(fa) + torch.abs(fb)
n = a.shape[0]
if n % 2 == 0:
absab[1:-1] *= 2
else:
absab[1:] *= 2
phia = torch.angle(fa)
phib = torch.angle(fb)
deltaphase = phib - phia
deltaphase = deltaphase - 2 * np.pi * torch.floor(deltaphase / 2 / np.pi + 0.5)
w = 2 * np.pi * torch.arange(n // 2 + 1).to(a) + deltaphase
t = torch.arange(n).unsqueeze(-1).to(a) / n
result = (
a * (fade_out**2)
+ b * (fade_in**2)
+ torch.sum(absab * torch.cos(w * t + phia), -1) * window / n
)
return result
class HeadlessRVCConfig:
"""Configuration for headless RVC operation"""
def __init__(self, config_dict=None):
# Model paths
self.pth_path = ""
self.index_path = ""
# Audio processing parameters
self.pitch = 0 # Pitch shift in semitones
self.formant = 0.0 # Formant shift
self.block_time = 0.25 # Block size in seconds
self.crossfade_time = 0.05 # Crossfade duration
self.extra_time = 2.5 # Extra buffer time
self.threshold = -60 # Voice activity threshold in dB
# RVC parameters
self.index_rate = 0.0 # Index feature ratio (0-1)
self.rms_mix_rate = 0.0 # Volume envelope mixing (0-1)
self.f0method = "rmvpe" # F0 extraction method
# Noise reduction
self.I_noise_reduce = False # Input noise reduction
self.O_noise_reduce = False # Output noise reduction
self.use_pv = False # Use phase vocoder
# Audio device settings
self.input_device = None # Input device name/index
self.output_device = None # Output device name/index
self.samplerate = 48000 # Sample rate
self.channels = 2 # Number of channels
# Processing
self.n_cpu = min(n_cpu, 4)
# Apply config dict if provided
if config_dict:
for key, value in config_dict.items():
if hasattr(self, key):
setattr(self, key, value)
def save(self, path):
"""Save configuration to JSON file"""
config_dict = {k: v for k, v in self.__dict__.items()
if not k.startswith('_')}
with open(path, 'w') as f:
json.dump(config_dict, f, indent=2)
@classmethod
def load(cls, path):
"""Load configuration from JSON file"""
with open(path, 'r') as f:
config_dict = json.load(f)
return cls(config_dict)
class HeadlessRVC:
"""Headless RVC processor using GUI logic"""
def __init__(self, config: HeadlessRVCConfig):
self.gui_config = config
# Save and clear sys.argv to prevent Config from parsing our arguments
saved_argv = sys.argv.copy()
sys.argv = [sys.argv[0]] # Keep only script name
# Change to RVC directory before initializing Config
saved_cwd = os.getcwd()
os.chdir(str(rvc_dir))
self.config = Config()
os.chdir(saved_cwd)
# Restore sys.argv
sys.argv = saved_argv
self.running = False
self.stream = None
# Initialize buffers and parameters (same as GUI)
self.function = "vc"
self.rvc = None
self.tgt_sr = None
def initialize_rvc(self):
"""Initialize RVC model (same as GUI's start_vc)"""
# Convert paths to absolute if they're not already
pth_path = Path(self.gui_config.pth_path)
if not pth_path.is_absolute():
pth_path = (SCRIPT_DIR / pth_path).resolve()
index_path = Path(self.gui_config.index_path)
if not index_path.is_absolute():
index_path = (SCRIPT_DIR / index_path).resolve()
print(f"Loading RVC model: {pth_path}")
print(f"Loading index: {index_path}")
# Change to RVC directory for model loading
saved_cwd = os.getcwd()
os.chdir(str(rvc_dir))
try:
self.rvc = rvc_for_realtime.RVC(
self.gui_config.pitch,
self.gui_config.formant,
str(pth_path),
str(index_path),
self.gui_config.index_rate,
self.gui_config.n_cpu,
inp_q,
opt_q,
self.config,
self.rvc if hasattr(self, "rvc") else None,
)
finally:
os.chdir(saved_cwd)
# Get target sample rate (with fallback)
if hasattr(self.rvc, 'tgt_sr') and self.rvc.tgt_sr:
self.tgt_sr = self.rvc.tgt_sr
else:
# Fallback to config sample rate if tgt_sr not set
logger.warning("RVC tgt_sr not set, using config sample rate")
self.tgt_sr = self.gui_config.samplerate
2026-01-12 23:14:56 +02:00
self.gui_config.samplerate = (
self.tgt_sr if self.tgt_sr else self.gui_config.samplerate
)
# Calculate frame sizes
self.zc = self.gui_config.samplerate // 100
self.block_frame = int(
np.round(self.gui_config.block_time * self.gui_config.samplerate / self.zc)
* self.zc
)
self.block_frame_16k = 160 * self.block_frame // self.zc
self.crossfade_frame = int(
np.round(self.gui_config.crossfade_time * self.gui_config.samplerate / self.zc)
* self.zc
)
self.sola_buffer_frame = min(self.crossfade_frame, 4 * self.zc)
self.sola_search_frame = self.zc
self.extra_frame = int(
np.round(self.gui_config.extra_time * self.gui_config.samplerate / self.zc)
* self.zc
)
self.input_wav = torch.zeros(
self.extra_frame
+ self.crossfade_frame
+ self.sola_search_frame
+ self.block_frame,
device=self.config.device,
dtype=torch.float32,
)
self.input_wav_denoise = self.input_wav.clone()
self.input_wav_res = torch.zeros(
160 * self.input_wav.shape[0] // self.zc,
device=self.config.device,
dtype=torch.float32,
)
self.rms_buffer = np.zeros(4 * self.zc, dtype="float32")
self.sola_buffer = torch.zeros(
self.sola_buffer_frame, device=self.config.device, dtype=torch.float32
)
self.nr_buffer = self.sola_buffer.clone()
self.output_buffer = self.input_wav.clone()
self.skip_head = int(self.extra_frame / self.zc)
self.return_length = (
self.block_frame
+ self.sola_buffer_frame
+ self.sola_search_frame
) // self.zc
self.fade_in_window = (
torch.sin(
0.5
* np.pi
* torch.linspace(
0.0,
1.0,
steps=self.sola_buffer_frame,
device=self.config.device,
dtype=torch.float32,
)
)
** 2
)
self.fade_out_window = 1 - self.fade_in_window
# Resampler
self.resampler = tat.Resample(
orig_freq=self.gui_config.samplerate,
new_freq=16000,
lowpass_filter_width=128,
rolloff=0.99,
resampling_method="sinc_interp_kaiser",
beta=14.769656459379492,
dtype=torch.float32,
).to(self.config.device)
if self.tgt_sr != self.gui_config.samplerate:
self.resampler2 = tat.Resample(
orig_freq=self.tgt_sr,
new_freq=self.gui_config.samplerate,
lowpass_filter_width=128,
rolloff=0.99,
resampling_method="sinc_interp_kaiser",
beta=14.769656459379492,
dtype=torch.float32,
).to(self.config.device)
else:
self.resampler2 = None
# Torchgate for noise reduction
self.tg = TorchGate(
sr=self.gui_config.samplerate, nonstationary=True
).to(self.config.device)
print("✓ RVC model initialized successfully")
def audio_callback(self, indata: np.ndarray, outdata: np.ndarray, frames, times, status):
"""
Audio processing callback (same logic as GUI)
"""
import time
# Save current directory and switch to RVC directory for model loading
saved_cwd = os.getcwd()
os.chdir(str(rvc_dir))
try:
start_time = time.perf_counter()
indata = librosa.to_mono(indata.T)
# Threshold processing
if self.gui_config.threshold > -60:
indata = np.append(self.rms_buffer, indata)
rms = librosa.feature.rms(
y=indata, frame_length=4 * self.zc, hop_length=self.zc
)[:, 2:]
self.rms_buffer[:] = indata[-4 * self.zc:]
indata = indata[2 * self.zc - self.zc // 2:]
db_threshold = (
librosa.amplitude_to_db(rms, ref=1.0)[0] < self.gui_config.threshold
)
for i in range(db_threshold.shape[0]):
if db_threshold[i]:
indata[i * self.zc: (i + 1) * self.zc] = 0
indata = indata[self.zc // 2:]
# Update input buffer
self.input_wav[:-self.block_frame] = self.input_wav[self.block_frame:].clone()
self.input_wav[-indata.shape[0]:] = torch.from_numpy(indata).to(self.config.device)
self.input_wav_res[:-self.block_frame_16k] = self.input_wav_res[self.block_frame_16k:].clone()
# Input noise reduction
if self.gui_config.I_noise_reduce:
self.input_wav_denoise[:-self.block_frame] = self.input_wav_denoise[self.block_frame:].clone()
input_wav = self.input_wav[-self.sola_buffer_frame - self.block_frame:]
input_wav = self.tg(input_wav.unsqueeze(0), self.input_wav.unsqueeze(0)).squeeze(0)
input_wav[:self.sola_buffer_frame] *= self.fade_in_window
input_wav[:self.sola_buffer_frame] += self.nr_buffer * self.fade_out_window
self.input_wav_denoise[-self.block_frame:] = input_wav[:self.block_frame]
self.nr_buffer[:] = input_wav[self.block_frame:]
self.input_wav_res[-self.block_frame_16k - 160:] = self.resampler(
self.input_wav_denoise[-self.block_frame - 2 * self.zc:]
)[160:]
else:
self.input_wav_res[-160 * (indata.shape[0] // self.zc + 1):] = (
self.resampler(self.input_wav[-indata.shape[0] - 2 * self.zc:])[160:]
)
# Voice conversion
infer_wav = self.rvc.infer(
self.input_wav_res,
self.block_frame_16k,
self.skip_head,
self.return_length,
self.gui_config.f0method,
)
if self.resampler2 is not None:
infer_wav = self.resampler2(infer_wav)
# Output noise reduction
if self.gui_config.O_noise_reduce:
self.output_buffer[:-self.block_frame] = self.output_buffer[self.block_frame:].clone()
self.output_buffer[-self.block_frame:] = infer_wav[-self.block_frame:]
infer_wav = self.tg(infer_wav.unsqueeze(0), self.output_buffer.unsqueeze(0)).squeeze(0)
# RMS mixing
if self.gui_config.rms_mix_rate < 1:
input_wav = self.input_wav_denoise[self.extra_frame:] if self.gui_config.I_noise_reduce else self.input_wav[self.extra_frame:]
rms1 = librosa.feature.rms(
y=input_wav[:infer_wav.shape[0]].cpu().numpy(),
frame_length=4 * self.zc,
hop_length=self.zc,
)
rms1 = torch.from_numpy(rms1).to(self.config.device)
rms1 = F.interpolate(
rms1.unsqueeze(0),
size=infer_wav.shape[0] + 1,
mode="linear",
align_corners=True,
)[0, 0, :-1]
rms2 = librosa.feature.rms(
y=infer_wav[:].cpu().numpy(),
frame_length=4 * self.zc,
hop_length=self.zc,
)
rms2 = torch.from_numpy(rms2).to(self.config.device)
rms2 = F.interpolate(
rms2.unsqueeze(0),
size=infer_wav.shape[0] + 1,
mode="linear",
align_corners=True,
)[0, 0, :-1]
rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-3)
infer_wav *= torch.pow(
rms1 / rms2, torch.tensor(1 - self.gui_config.rms_mix_rate)
)
# SOLA algorithm
conv_input = infer_wav[
None, None, :self.sola_buffer_frame + self.sola_search_frame
]
cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :])
cor_den = torch.sqrt(
F.conv1d(
conv_input**2,
torch.ones(1, 1, self.sola_buffer_frame, device=self.config.device),
)
+ 1e-8
)
if sys.platform == "darwin":
_, sola_offset = torch.max(cor_nom[0, 0] / cor_den[0, 0])
sola_offset = sola_offset.item()
else:
sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0])
infer_wav = infer_wav[sola_offset:]
if "privateuseone" in str(self.config.device) or not self.gui_config.use_pv:
infer_wav[:self.sola_buffer_frame] *= self.fade_in_window
infer_wav[:self.sola_buffer_frame] += self.sola_buffer * self.fade_out_window
else:
infer_wav[:self.sola_buffer_frame] = phase_vocoder(
self.sola_buffer,
infer_wav[:self.sola_buffer_frame],
self.fade_out_window,
self.fade_in_window,
)
self.sola_buffer[:] = infer_wav[
self.block_frame: self.block_frame + self.sola_buffer_frame
]
outdata[:] = (
infer_wav[:self.block_frame]
.repeat(self.gui_config.channels, 1)
.t()
.cpu()
.numpy()
)
total_time = time.perf_counter() - start_time
2026-01-13 00:20:55 +02:00
logger.debug(f"Infer time: {total_time:.2f}s")
2026-01-12 23:14:56 +02:00
finally:
# Restore directory
os.chdir(saved_cwd)
def start(self):
"""Start the audio stream"""
if self.running:
print("Already running")
return
if self.rvc is None:
self.initialize_rvc()
print(f"Starting audio stream...")
print(f" Input: {self.gui_config.input_device}")
print(f" Output: {self.gui_config.output_device}")
print(f" Sample rate: {self.gui_config.samplerate}")
self.running = True
self.stream = sd.Stream(
callback=self.audio_callback,
blocksize=self.block_frame,
samplerate=self.gui_config.samplerate,
channels=self.gui_config.channels,
dtype="float32",
device=(self.gui_config.input_device, self.gui_config.output_device),
)
self.stream.start()
print("✓ Audio stream started")
def stop(self):
"""Stop the audio stream"""
if not self.running:
return
self.running = False
if self.stream is not None:
self.stream.abort()
self.stream.close()
self.stream = None
2026-01-13 00:20:55 +02:00
# Clean up harvest processes
cleanup_harvest_processes()
2026-01-12 23:14:56 +02:00
print("✓ Audio stream stopped")
def main():
parser = argparse.ArgumentParser(description="Headless RVC Voice Conversion")
parser.add_argument("--config", type=str, help="Path to JSON configuration file")
parser.add_argument("--pth", type=str, help="Path to RVC model (.pth file)")
parser.add_argument("--index", type=str, help="Path to index file")
parser.add_argument("--pitch", type=int, default=0, help="Pitch shift in semitones")
parser.add_argument("--input-device", type=str, help="Input audio device")
parser.add_argument("--output-device", type=str, help="Output audio device")
parser.add_argument("--f0method", type=str, default="rmvpe",
choices=["pm", "harvest", "crepe", "rmvpe", "fcpe"],
help="F0 extraction method")
parser.add_argument("--index-rate", type=float, default=0.0, help="Index feature ratio (0-1)")
parser.add_argument("--save-config", type=str, help="Save configuration to file")
args = parser.parse_args()
# Load or create configuration
if args.config:
config = HeadlessRVCConfig.load(args.config)
print(f"Loaded configuration from: {args.config}")
else:
config = HeadlessRVCConfig()
# Override with command-line arguments
if args.pth:
config.pth_path = args.pth
if args.index:
config.index_path = args.index
if args.pitch != 0:
config.pitch = args.pitch
if args.input_device:
config.input_device = args.input_device
if args.output_device:
config.output_device = args.output_device
if args.f0method:
config.f0method = args.f0method
if args.index_rate:
config.index_rate = args.index_rate
# Save configuration if requested
if args.save_config:
config.save(args.save_config)
print(f"Saved configuration to: {args.save_config}")
return
# Validate required parameters
if not config.pth_path:
print("Error: --pth or --config with pth_path is required")
return
# Create and start headless RVC
print("=" * 70)
print("Headless RVC Voice Conversion")
print("=" * 70)
rvc = HeadlessRVC(config)
rvc.start()
try:
print("\nPress Ctrl+C to stop...")
while True:
sd.sleep(1000)
except KeyboardInterrupt:
print("\n\nStopping...")
finally:
rvc.stop()
print("Goodbye!")
if __name__ == "__main__":
main()