add: absorb soprano_to_rvc as regular subdirectory

Voice conversion pipeline (Soprano TTS → RVC) with Docker support.
Previously tracked as bare gitlink; removed .git/ directories and
absorbed into main repo for unified tracking.

Includes: Soprano TTS, RVC WebUI integration, Docker configs,
WebSocket API, and benchmark scripts.
Updated .gitignore to exclude large model weights (*.pth, *.pt, *.onnx, *.index).
287 files (3.1GB of ML weights properly excluded via gitignore).
This commit is contained in:
2026-03-04 00:24:53 +02:00
parent 34b184a05a
commit 8ca716029e
287 changed files with 47102 additions and 0 deletions

View File

@@ -0,0 +1 @@
from .tts import SopranoTTS

View File

@@ -0,0 +1,20 @@
class BaseModel:
def infer(self,
prompts,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
'''
Takes a list of prompts and returns the output hidden states
'''
pass
def stream_infer(self,
prompt,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
'''
Takes a prompt and returns an iterator of the output hidden states
'''
pass

View File

@@ -0,0 +1,59 @@
import torch
from lmdeploy import pipeline, TurbomindEngineConfig, GenerationConfig
from .base import BaseModel
class LMDeployModel(BaseModel):
def __init__(self,
device='cuda',
cache_size_mb=100,
model_path=None,
**kwargs):
assert device == 'cuda', "lmdeploy only supports cuda devices, consider changing device or using a different backend instead."
cache_size_ratio = cache_size_mb * 1024**2 / torch.cuda.get_device_properties('cuda').total_memory
backend_config = TurbomindEngineConfig(cache_max_entry_count=cache_size_ratio)
# Use local model if path provided, otherwise use HuggingFace
model_name_or_path = model_path if model_path else 'ekwek/Soprano-1.1-80M'
self.pipeline = pipeline(model_name_or_path,
log_level='ERROR',
backend_config=backend_config)
def infer(self,
prompts,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
gen_config=GenerationConfig(output_last_hidden_state='generation',
do_sample=True,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
max_new_tokens=512)
responses = self.pipeline(prompts, gen_config=gen_config)
res = []
for response in responses:
res.append({
'finish_reason': response.finish_reason,
'hidden_state': response.last_hidden_state
})
return res
def stream_infer(self,
prompt,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
gen_config=GenerationConfig(output_last_hidden_state='generation',
do_sample=True,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
max_new_tokens=512)
responses = self.pipeline.stream_infer([prompt], gen_config=gen_config)
for response in responses:
yield {
'finish_reason': response.finish_reason,
'hidden_state': response.last_hidden_state
}

View File

@@ -0,0 +1,154 @@
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import LogitsProcessorList, RepetitionPenaltyLogitsProcessor, TemperatureLogitsWarper, TopPLogitsWarper
from .base import BaseModel
class TransformersModel(BaseModel):
def __init__(self,
device='cuda',
model_path=None,
**kwargs):
self.device = device
# Use local model if path provided, otherwise use HuggingFace
model_name_or_path = model_path if model_path else 'ekwek/Soprano-1.1-80M'
self.model = AutoModelForCausalLM.from_pretrained(
model_name_or_path,
dtype=torch.bfloat16 if device == 'cuda' else torch.float32,
device_map=device
)
self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
self.model.eval()
def infer(self,
prompts,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
if temperature <= 0.0:
temperature = 0.001 # temp must be nonzero
inputs = self.tokenizer(
prompts,
return_tensors='pt',
padding=True,
truncation=True,
max_length=512,
).to(self.device)
with torch.no_grad():
outputs = self.model.generate(
input_ids=inputs['input_ids'],
attention_mask=inputs['attention_mask'],
max_new_tokens=512,
do_sample=True,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
pad_token_id=self.tokenizer.pad_token_id,
return_dict_in_generate=True,
output_hidden_states=True,
)
res = []
eos_token_id = self.model.config.eos_token_id
for i in range(len(prompts)):
seq = outputs.sequences[i]
hidden_states = []
num_output_tokens = len(outputs.hidden_states)
for j in range(num_output_tokens):
token = seq[j + seq.size(0) - num_output_tokens]
if token != eos_token_id: hidden_states.append(outputs.hidden_states[j][-1][i, -1, :])
last_hidden_state = torch.stack(hidden_states).squeeze()
finish_reason = 'stop' if seq[-1].item() == eos_token_id else 'length'
res.append({
'finish_reason': finish_reason,
'hidden_state': last_hidden_state
})
return res
def stream_infer(self,
prompt,
top_p=0.95,
temperature=0.3,
repetition_penalty=1.2):
if temperature <= 0.0:
temperature = 0.001 # temp must be nonzero
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors='pt').to(self.device)
input_ids = inputs['input_ids']
# Prepare Logits Processors for sampling
logits_processor = LogitsProcessorList()
if repetition_penalty != 1.0:
logits_processor.append(RepetitionPenaltyLogitsProcessor(penalty=repetition_penalty))
logits_warper = LogitsProcessorList()
if temperature != 1.0:
logits_warper.append(TemperatureLogitsWarper(temperature=temperature))
if top_p < 1.0:
logits_warper.append(TopPLogitsWarper(top_p=top_p))
# Helper to sample next token
def get_next_token(logits, input_seq):
scores = logits_processor(input_seq, logits)
scores = logits_warper(input_seq, scores)
probs = torch.nn.functional.softmax(scores, dim=-1)
# Sample from the distribution
return torch.multinomial(probs, num_samples=1)
with torch.no_grad():
# Initial forward pass with the prompt
outputs = self.model(
input_ids,
use_cache=True,
output_hidden_states=True
)
past_key_values = outputs.past_key_values
next_token_logits = outputs.logits[:, -1, :]
# We need to maintain the full sequence for repetition penalty
generated_ids = input_ids
# Sample the first token
next_token = get_next_token(next_token_logits, generated_ids)
max_new_tokens = 512
eos_token_id = self.model.config.eos_token_id
for i in range(max_new_tokens):
# Append generated token to sequence history
generated_ids = torch.cat([generated_ids, next_token], dim=-1)
# Run forward pass for the single new token
outputs = self.model(
next_token,
past_key_values=past_key_values,
use_cache=True,
output_hidden_states=True
)
# Update cache and get hidden state
past_key_values = outputs.past_key_values
current_hidden_state = outputs.hidden_states[-1][:, -1, :] # Last layer, last token
finish_reason = None
if next_token.item() == eos_token_id:
finish_reason = 'stop'
elif i == max_new_tokens - 1:
finish_reason = 'length'
# Yield result matching lmdeploy format
yield {
'finish_reason': finish_reason,
'hidden_state': current_hidden_state
}
if finish_reason:
break
# Prepare for next iteration
next_token_logits = outputs.logits[:, -1, :]
next_token = get_next_token(next_token_logits, generated_ids)

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
Soprano TTS Command Line Interface
"""
import argparse
from soprano import SopranoTTS
from soprano.utils.streaming import play_stream
def main():
parser = argparse.ArgumentParser(description='Soprano Text-to-Speech CLI')
parser.add_argument('text', help='Text to synthesize')
parser.add_argument('--output', '-o', default='output.wav',
help='Output audio file path (non-streaming only)')
parser.add_argument('--model-path', '-m',
help='Path to local model directory (optional)')
parser.add_argument('--device', '-d', default='auto',
choices=['auto', 'cuda', 'cpu', 'mps'],
help='Device to use for inference')
parser.add_argument('--backend', '-b', default='auto',
choices=['auto', 'transformers', 'lmdeploy'],
help='Backend to use for inference')
parser.add_argument('--cache-size', '-c', type=int, default=100,
help='Cache size in MB (for lmdeploy backend)')
parser.add_argument('--decoder-batch-size', '-bs', type=int, default=1,
help='Batch size when decoding audio')
parser.add_argument('--streaming', '-s', action='store_true',
help='Enable streaming playback to speakers')
args = parser.parse_args()
# Initialize TTS
tts = SopranoTTS(
backend=args.backend,
device=args.device,
cache_size_mb=args.cache_size,
decoder_batch_size=args.decoder_batch_size,
model_path=args.model_path
)
print(f"Generating speech for: '{args.text}'")
if args.streaming:
stream = tts.infer_stream(args.text, chunk_size=1)
play_stream(stream)
else:
tts.infer(args.text, out_path=args.output)
print(f"Audio saved to: {args.output}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,47 @@
import base64
import io
import json
from typing import Generator
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
from scipy.io.wavfile import write
from torch import Tensor
from soprano.tts import SopranoTTS
# Load model at startup
tts = SopranoTTS(cache_size_mb = 100)
app = FastAPI(title="Soprano TTS API")
def _tensor_to_wav_bytes(tensor: Tensor) -> bytes:
"""
Convert a 1D fp32 torch tensor to a WAV byte stream.
"""
# convert to int16
audio_int16 = (np.clip(tensor.numpy(), -1.0, 1.0) * 32767).astype(np.int16)
wav_io = io.BytesIO()
write(wav_io, 32000, audio_int16) # 32kHz sample rate
wav_io.seek(0)
return wav_io.read()
@app.post("/v1/audio/speech")
async def create_speech(payload: dict):
"""
Minimal implementation of OpenAI's Speech endpoint.
Fields:
- input: string - text to synthesize
- model, voice, etc. are accepted but ignored.
- response_format: str - ignored, only support wav.
"""
text = payload.get("input")
if not isinstance(text, str) or not text.strip():
raise HTTPException(status_code=400, detail="`input` field must be a non-empty string.")
audio_tensor = tts.infer(text)
wav_bytes = _tensor_to_wav_bytes(audio_tensor)
return Response(content=wav_bytes, media_type="audio/wav", headers={"Content-Disposition": 'attachment; filename="speech.wav"'})

View File

@@ -0,0 +1,237 @@
from .vocos.decoder import SopranoDecoder
from .utils.text_normalizer import clean_text
from .utils.text_splitter import split_and_recombine_text
from .utils.auto_select import select_device, select_backend
import torch
import re
from unidecode import unidecode
from scipy.io import wavfile
from huggingface_hub import hf_hub_download
import os
import time
class SopranoTTS:
"""
Soprano Text-to-Speech model.
Args:
backend: Backend to use for inference. Options:
- 'auto' (default): Automatically select best backend. Tries lmdeploy first (fastest),
falls back to transformers. CPU always uses transformers.
- 'lmdeploy': Force use of LMDeploy (fastest, CUDA only)
- 'transformers': Force use of HuggingFace Transformers (slower, all devices)
device: Device to run inference on ('auto', 'cuda', 'cpu', 'mps')
cache_size_mb: Cache size in MB for lmdeploy backend
decoder_batch_size: Batch size for decoder
"""
def __init__(self,
backend='auto',
device='auto',
cache_size_mb=100,
decoder_batch_size=1,
model_path=None):
device = select_device(device=device)
backend = select_backend(backend=backend, device=device)
if backend == 'lmdeploy':
from .backends.lmdeploy import LMDeployModel
self.pipeline = LMDeployModel(device=device, cache_size_mb=cache_size_mb, model_path=model_path)
elif backend == 'transformers':
from .backends.transformers import TransformersModel
self.pipeline = TransformersModel(device=device, model_path=model_path)
self.device = device
self.backend = backend
self.decoder = SopranoDecoder().to(device)
if model_path:
decoder_path = os.path.join(model_path, 'decoder.pth')
else:
decoder_path = hf_hub_download(repo_id='ekwek/Soprano-1.1-80M', filename='decoder.pth')
self.decoder.load_state_dict(torch.load(decoder_path, map_location=device))
self.decoder_batch_size=decoder_batch_size
self.RECEPTIVE_FIELD = 4 # Decoder receptive field
self.TOKEN_SIZE = 2048 # Number of samples per audio token
self.infer("Hello world!") # warmup
def _preprocess_text(self, texts, min_length=30):
'''
adds prompt format and sentence/part index
Enforces a minimum sentence length by merging short sentences.
'''
res = []
for text_idx, text in enumerate(texts):
text = text.strip()
cleaned_text = clean_text(text)
sentences = split_and_recombine_text(cleaned_text)
processed = []
for sentence in sentences:
processed.append({
"text": sentence,
"text_idx": text_idx,
})
if min_length > 0 and len(processed) > 1:
merged = []
i = 0
while i < len(processed):
cur = processed[i]
if len(cur["text"]) < min_length:
if merged: merged[-1]["text"] = (merged[-1]["text"] + " " + cur["text"]).strip()
else:
if i + 1 < len(processed): processed[i + 1]["text"] = (cur["text"] + " " + processed[i + 1]["text"]).strip()
else: merged.append(cur)
else: merged.append(cur)
i += 1
processed = merged
sentence_idxes = {}
for item in processed:
if item['text_idx'] not in sentence_idxes: sentence_idxes[item['text_idx']] = 0
res.append((f'[STOP][TEXT]{item["text"]}[START]', item["text_idx"], sentence_idxes[item['text_idx']]))
sentence_idxes[item['text_idx']] += 1
return res
def hallucination_detector(self, hidden_state):
'''
Analyzes hidden states to find long runs of similar sequences.
'''
DIFF_THRESHOLD = 300 # minimal difference between sequences
MAX_RUNLENGTH = 16 # maximum number of recent similar sequences
if len(hidden_state) <= MAX_RUNLENGTH: # hidden state not long enough
return False
aah_runlength = 0
for i in range(len(hidden_state) - 1):
current_sequences = hidden_state[i]
next_sequences = hidden_state[i + 1]
diffs = torch.abs(current_sequences - next_sequences)
total_diff = diffs.sum(dim=0)
if total_diff < DIFF_THRESHOLD:
aah_runlength += 1
elif aah_runlength > 0:
aah_runlength -= 1
if aah_runlength > MAX_RUNLENGTH:
return True
return False
def infer(self,
text,
out_path=None,
top_p=0.95,
temperature=0.0,
repetition_penalty=1.2,
retries=0):
results = self.infer_batch([text],
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
out_dir=None,
retries=retries)[0]
if out_path:
wavfile.write(out_path, 32000, results.cpu().numpy())
return results
def infer_batch(self,
texts,
out_dir=None,
top_p=0.95,
temperature=0.0,
repetition_penalty=1.2,
retries=0):
sentence_data = self._preprocess_text(texts)
prompts = list(map(lambda x: x[0], sentence_data))
hidden_states = [None] * len(prompts)
pending_indices = list(range(0, len(prompts)))
tries_left = 1 + max(0, retries)
while tries_left > 0 and pending_indices:
current_prompts = [prompts[i] for i in pending_indices]
responses = self.pipeline.infer(current_prompts,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty)
bad_indices = []
for idx, response in enumerate(responses):
hidden_state = response['hidden_state']
hidden_states[pending_indices[idx]] = hidden_state
if response['finish_reason'] != 'stop':
print(f"Warning: A sentence did not complete generation, likely due to hallucination.")
if retries > 0 and self.hallucination_detector(hidden_state):
print(f"Warning: A sentence contained a hallucination.")
bad_indices.append(pending_indices[idx])
if not bad_indices:
break
else:
pending_indices = bad_indices
tries_left -= 1
if tries_left > 0:
print(f"Warning: {len(pending_indices)} sentence(s) will be regenerated.")
combined = list(zip(hidden_states, sentence_data))
combined.sort(key=lambda x: -x[0].size(0))
hidden_states, sentence_data = zip(*combined)
num_texts = len(texts)
audio_concat = [[] for _ in range(num_texts)]
for sentence in sentence_data:
audio_concat[sentence[1]].append(None)
for idx in range(0, len(hidden_states), self.decoder_batch_size):
batch_hidden_states = []
lengths = list(map(lambda x: x.size(0), hidden_states[idx:idx+self.decoder_batch_size]))
N = len(lengths)
for i in range(N):
batch_hidden_states.append(torch.cat([
torch.zeros((1, 512, lengths[0]-lengths[i]), device=self.device),
hidden_states[idx+i].unsqueeze(0).transpose(1,2).to(self.device).to(torch.float32),
], dim=2))
batch_hidden_states = torch.cat(batch_hidden_states)
with torch.no_grad():
audio = self.decoder(batch_hidden_states)
for i in range(N):
text_id = sentence_data[idx+i][1]
sentence_id = sentence_data[idx+i][2]
audio_concat[text_id][sentence_id] = audio[i].squeeze()[-(lengths[i]*self.TOKEN_SIZE-self.TOKEN_SIZE):]
audio_concat = [torch.cat(x).cpu() for x in audio_concat]
if out_dir:
os.makedirs(out_dir, exist_ok=True)
for i in range(len(audio_concat)):
wavfile.write(f"{out_dir}/{i}.wav", 32000, audio_concat[i].cpu().numpy())
return audio_concat
def infer_stream(self,
text,
chunk_size=1,
top_p=0.95,
temperature=0.0,
repetition_penalty=1.2):
start_time = time.time()
sentence_data = self._preprocess_text([text])
first_chunk = True
for sentence, _, _ in sentence_data:
responses = self.pipeline.stream_infer(sentence,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty)
hidden_states_buffer = []
chunk_counter = chunk_size
for token in responses:
finished = token['finish_reason'] is not None
if not finished: hidden_states_buffer.append(token['hidden_state'][-1])
hidden_states_buffer = hidden_states_buffer[-(2*self.RECEPTIVE_FIELD+chunk_size):]
if finished or len(hidden_states_buffer) >= self.RECEPTIVE_FIELD + chunk_size:
if finished or chunk_counter == chunk_size:
batch_hidden_states = torch.stack(hidden_states_buffer)
inp = batch_hidden_states.unsqueeze(0).transpose(1, 2).to(self.device).to(torch.float32)
with torch.no_grad():
audio = self.decoder(inp)[0]
if finished:
audio_chunk = audio[-((self.RECEPTIVE_FIELD+chunk_counter-1)*self.TOKEN_SIZE-self.TOKEN_SIZE):]
else:
audio_chunk = audio[-((self.RECEPTIVE_FIELD+chunk_size)*self.TOKEN_SIZE-self.TOKEN_SIZE):-(self.RECEPTIVE_FIELD*self.TOKEN_SIZE-self.TOKEN_SIZE)]
chunk_counter = 0
if first_chunk:
print(f"Streaming latency: {1000*(time.time()-start_time):.2f} ms")
first_chunk = False
yield audio_chunk.cpu()
chunk_counter += 1

View File

@@ -0,0 +1,32 @@
import torch
RECOGNIZED_DEVICES = ['auto', 'cuda', 'cpu', 'mps']
RECOGNIZED_BACKENDS = ['auto', 'lmdeploy', 'transformers']
def select_device(device='auto'):
if device == 'auto':
if torch.cuda.is_available():
device = 'cuda'
elif torch.backends.mps.is_available():
device = 'mps'
else:
device = 'cpu'
assert device in RECOGNIZED_DEVICES, f"unrecognized device {device}, device must be in {RECOGNIZED_DEVICES}"
print(f"Using device {device}")
return device
def select_backend(backend='auto', device='auto'):
if backend == 'auto':
if device == 'cpu':
backend = 'transformers'
else:
try:
import lmdeploy
backend = 'lmdeploy'
except ImportError:
backend = 'transformers'
assert backend in RECOGNIZED_BACKENDS, f"unrecognized backend {backend}, backend must be in {RECOGNIZED_BACKENDS}"
print(f"Using backend {backend}")
return backend

View File

@@ -0,0 +1,34 @@
import sounddevice as sd
import torch
import time
def play_stream(stream, sample_rate=32000):
"""
Play streamed audio chunks to speakers in real time.
"""
with sd.OutputStream(
samplerate=sample_rate,
channels=1,
dtype='float32',
blocksize=0
) as out_stream:
start = time.time()
latency = None
first = True
for chunk in stream:
if first:
latency = time.time()-start
first = False
if isinstance(chunk, torch.Tensor):
chunk = chunk.detach().cpu()
# Ensure shape (N, 1)
if chunk.dim() == 1:
chunk = chunk.unsqueeze(1)
elif chunk.dim() == 2 and chunk.shape[0] == 1:
chunk = chunk.transpose(0, 1)
out_stream.write(chunk.numpy())
return latency

View File

@@ -0,0 +1,410 @@
"""
Normalize input text to a format that Soprano recognizes.
Adapted from https://github.com/neonbjb/tortoise-tts/blob/main/tortoise/utils/tokenizer.py
"""
import re
import inflect
from unidecode import unidecode
_inflect = inflect.engine()
####################################################################################################
# Abbreviations
_abbreviations = [(re.compile('\\b%s\\.' % x[0], re.IGNORECASE), x[1]) for x in [
('mrs', 'misess'),
('ms', 'miss'),
('mr', 'mister'),
('dr', 'doctor'),
('st', 'saint'),
('co', 'company'),
('jr', 'junior'),
('maj', 'major'),
('gen', 'general'),
('drs', 'doctors'),
('rev', 'reverend'),
('lt', 'lieutenant'),
('hon', 'honorable'),
('sgt', 'sergeant'),
('capt', 'captain'),
('esq', 'esquire'),
('ltd', 'limited'),
('col', 'colonel'),
('ft', 'fort'),
]]
_cased_abbreviations = [(re.compile('\\b%s\\b' % x[0]), x[1]) for x in [
('TTS', 'text to speech'),
('Hz', 'hertz'),
('kHz', 'kilohertz'),
('KBs', 'kilobytes'),
('KB', 'kilobyte'),
('MBs', 'megabytes'),
('MB', 'megabyte'),
('GBs', 'gigabytes'),
('GB', 'gigabyte'),
('TBs', 'terabytes'),
('TB', 'terabyte'),
('APIs', 'a p i\'s'),
('API', 'a p i'),
('CLIs', 'c l i\'s'),
('CLI', 'c l i'),
('CPUs', 'c p u\'s'),
('CPU', 'c p u'),
('GPUs', 'g p u\'s'),
('GPU', 'g p u'),
('Ave', 'avenue'),
('etc', 'et cetera'),
('Mon', 'monday'),
('Tues', 'tuesday'),
('Wed', 'wednesday'),
('Thurs', 'thursday'),
('Fri', 'friday'),
('Sat', 'saturday'),
('Sun', 'sunday'),
('and/or', 'and or'),
]]
def expand_abbreviations(text):
for regex, replacement in _abbreviations + _cased_abbreviations:
text = re.sub(regex, replacement, text)
return text
####################################################################################################
# Numbers
_num_prefix_re = re.compile(r'#\d')
_num_suffix_re = re.compile(r'\b\d+(K|M|B|T)\b', re.IGNORECASE)
_num_letter_split_re = re.compile(r'(\d[a-z]|[a-z]\d)', re.IGNORECASE)
_comma_number_re = re.compile(r'(\d[\d\,]+\d)')
_date_re = re.compile(r'(^|[^/])(\d\d?[/-]\d\d?[/-]\d\d(?:\d\d)?)($|[^/])')
_phone_number_re = re.compile(r'(\(?\d{3}\)?[-.\s]\d{3}[-.\s]?\d{4})')
_time_re = re.compile(r'(\d\d?:\d\d(?::\d\d)?)')
_pounds_re = re.compile(r'£([\d\,]*\d+)')
_dollars_re = re.compile(r'\$([\d\.\,]*\d+)')
_decimal_number_re = re.compile(r'(\d+(?:\.\d+)+)')
_multiply_re = re.compile(r'(\d\s?\*\s?\d)')
_divide_re = re.compile(r'(\d\s?/\s?\d)')
_add_re = re.compile(r'(\d\s?\+\s?\d)')
_subtract_re = re.compile(r'(\d?\s?-\s?\d)') # also does negative numbers
_fraction_re = re.compile(r'(\d+(?:/\d+)+)')
_ordinal_re = re.compile(r'\d+(st|nd|rd|th)')
_number_re = re.compile(r'\d+')
def _expand_num_prefix(m):
match = m.group(0)
return f"number {match[1]}"
def _expand_num_suffix(m):
match = m.group(0)
if match[-1].upper() == 'K': return f"{match[:-1]} thousand"
elif match[-1].upper() == 'M': return f"{match[:-1]} million"
elif match[-1].upper() == 'B': return f"{match[:-1]} billion"
elif match[-1].upper() == 'T': return f"{match[:-1]} trillion"
return match # unexpected format
def _split_alphanumeric(m):
match = m.group(1)
return f"{match[0]} {match[1]}"
def _remove_commas(m):
return m.group(1).replace(',', '')
def _expand_date(m):
match = m.group(2)
match = re.split('[./-]', match)
return m.group(1) + ' dash '.join(match) + m.group(3)
def _expand_phone_number(m):
match = m.group(1)
match = re.sub(r'\D', '', match)
assert len(match) == 10
match = f"{' '.join(list(match[:3]))}, {' '.join(list(match[3:6]))}, {' '.join(list(match[6:]))}"
return match
def _expand_time(m):
match = m.group(1)
match = match.split(':')
if len(match) == 2:
hours, minutes = match
if minutes == '00':
if int(hours) == 0:
return '0'
elif int(hours) > 12: return f"{hours} minutes"
return f"{hours} o'clock"
elif minutes.startswith('0'):
minutes = f'oh {minutes[1:]}'
return f"{hours} {minutes}"
else:
hours, minutes, seconds = match
if int(hours) != 0:
return f"{hours} {'oh oh' if minutes == '00' else f'oh {minutes}' if minutes.startswith('0') else {minutes}} {'' if seconds == '00' else f'oh {seconds}' if seconds.startswith('0') else seconds}"
elif minutes != '00':
return f"{minutes} {'oh oh' if seconds == '00' else f'oh {seconds}' if seconds.startswith('0') else seconds}"
else:
return seconds
def _expand_dollars(m):
match = m.group(1)
parts = match.split('.')
if len(parts) > 2:
return match + ' dollars' # Unexpected format
dollars = int(parts[0]) if parts[0] else 0
cents = int(parts[1]) if len(parts) > 1 and parts[1] else 0
if dollars and cents:
dollar_unit = 'dollar' if dollars == 1 else 'dollars'
cent_unit = 'cent' if cents == 1 else 'cents'
return '%s %s, %s %s' % (dollars, dollar_unit, cents, cent_unit)
elif dollars:
dollar_unit = 'dollar' if dollars == 1 else 'dollars'
return '%s %s' % (dollars, dollar_unit)
elif cents:
cent_unit = 'cent' if cents == 1 else 'cents'
return '%s %s' % (cents, cent_unit)
else:
return 'zero dollars'
def _expand_decimal_point(m):
match = m.group(1)
match = match.split('.')
return match[0] + ' point ' + ' point '.join(' '.join(list(match[i])) for i in range(1, len(match)))
def _expand_fraction(m):
match = m.group(1)
match = match.split('/')
return ' over '.join(match) if len(match)==2 else ' slash '.join(match)
def _expand_multiply(m):
return ' times '.join(m.group(1).split('*'))
def _expand_divide(m):
return ' over '.join(m.group(1).split('/'))
def _expand_add(m):
return ' plus '.join(m.group(1).split('+'))
def _expand_subtract(m):
return ' minus '.join(m.group(1).split('-'))
def _expand_ordinal(m):
return _inflect.number_to_words(m.group(0), andword='')
def _expand_number(m):
num = int(m.group(0))
if num > 1000 and num < 3000:
if num == 2000:
return 'two thousand'
elif num > 2000 and num < 2010:
return 'two thousand ' + _inflect.number_to_words(num % 100)
elif num % 100 == 0:
return _inflect.number_to_words(num // 100) + ' hundred'
else:
return _inflect.number_to_words(num, andword='', zero='oh', group=2).replace(', ', ' ')
else:
return _inflect.number_to_words(num, andword='')
def normalize_numbers(text):
text = re.sub(_num_prefix_re, _expand_num_prefix, text)
text = re.sub(_num_suffix_re, _expand_num_suffix, text)
text = re.sub(_comma_number_re, _remove_commas, text)
text = re.sub(_date_re, _expand_date, text)
text = re.sub(_phone_number_re, _expand_phone_number, text)
text = re.sub(_time_re, _expand_time, text)
text = re.sub(_pounds_re, r'\1 pounds', text)
text = re.sub(_dollars_re, _expand_dollars, text)
text = re.sub(_decimal_number_re, _expand_decimal_point, text)
text = re.sub(_multiply_re, _expand_multiply, text)
text = re.sub(_divide_re, _expand_divide, text)
text = re.sub(_add_re, _expand_add, text)
text = re.sub(_subtract_re, _expand_subtract, text)
text = re.sub(_fraction_re, _expand_fraction, text)
text = re.sub(_ordinal_re, _expand_ordinal, text)
for _ in range(2): # need to do this twice to find all matches
text = re.sub(_num_letter_split_re, _split_alphanumeric, text)
text = re.sub(_number_re, _expand_number, text)
return text
####################################################################################################
# Special characters & other patterns
_preunicode_special_characters = [(re.compile(x[0]), x[1]) for x in [
('', ' - '),
]]
_special_characters = [(re.compile(x[0]), x[1]) for x in [
('@', ' at '),
('&', ' and '),
('%', ' percent '),
(':', '.'),
(';', ','),
(r'\+', ' plus '),
(r'\\', ' backslash '),
('~', ' about '),
('(^| )<3', ' heart '),
('<=', ' less than or equal to '),
('>=', ' greater than or equal to '),
('<', ' less than '),
('>', ' greater than '),
('=', ' equals '),
('/', ' slash '),
('_', ' '),
(r'\*', ' '),
]]
_link_header_re = re.compile(r'(https?://)')
_dash_re = re.compile(r'(. - .)')
_dot_re = re.compile(r'([A-Z]\.[A-Z])', re.IGNORECASE)
_parentheses_re = re.compile(r'[\(\[\{].*[\)\]\}](.|$)')
def expand_preunicode_special_characters(text):
for regex, replacement in _preunicode_special_characters:
text = re.sub(regex, replacement, text)
return text
def expand_special_characters(text):
for regex, replacement in _special_characters:
text = re.sub(regex, replacement, text)
return text
def _expand_link_header(m):
return 'h t t p s colon slash slash '
def _expand_dash(m):
match = m.group(0)
return f"{match[0]}, {match[4]}"
def _expand_dot(m):
match = m.group(0)
return f"{match[0]} dot {match[2]}"
def _expand_parantheses(m):
match = m.group(0)
match = re.sub(r'[\(\[\{]', ', ', match)
match = re.sub(r'[\)\]\}][^$.!?,]', ', ', match)
match = re.sub(r'[\)\]\}]', '', match)
return match
def normalize_special(text):
text = re.sub(_link_header_re, _expand_link_header, text)
text = re.sub(_dash_re, _expand_dash, text)
text = re.sub(_dot_re, _expand_dot, text)
text = re.sub(_parentheses_re, _expand_parantheses, text)
return text
####################################################################################################
# Misc
def lowercase(text):
return text.lower()
def convert_to_ascii(text):
return unidecode(text)
def normalize_newlines(text):
text = text.split('\n')
for i in range(len(text)):
text[i] = text[i].strip()
if not text[i]: continue
if text[i][-1] not in '.!?':
text[i] = f"{text[i]}."
return ' '.join(text)
def remove_unknown_characters(text):
text = re.sub(r"[^A-Za-z !\$%&'\*\+,-./0123456789<>\?_]", "", text)
text = re.sub(r"[<>/_+]", "", text)
return text
def collapse_whitespace(text):
text = re.sub(r'\s+', ' ', text)
text = re.sub(r' [.\?!,]', lambda m: m.group(0)[1], text)
return text.strip()
def dedup_punctuation(text):
text = re.sub(r"\.\.\.+", "[ELLIPSIS]", text)
text = re.sub(r",+", ",", text)
text = re.sub(r"[\.,]*\.[\.,]*", ".", text)
text = re.sub(r"[\.,!]*![\.,!]*", "!", text)
text = re.sub(r"[\.,!\?]*\?[\.,!\?]*", "?", text)
text = re.sub(r"\[ELLIPSIS\]", "...", text)
return text
def clean_text(text):
text = expand_preunicode_special_characters(text)
text = convert_to_ascii(text)
text = normalize_newlines(text)
text = normalize_numbers(text)
text = normalize_special(text)
text = expand_abbreviations(text)
text = expand_special_characters(text)
text = lowercase(text)
text = remove_unknown_characters(text)
text = collapse_whitespace(text)
text = dedup_punctuation(text)
return text
if __name__ == '__main__':
print(clean_text('1,2,3,456,176'))
print(clean_text('123,456,789'))
print(clean_text('123,456,789th'))
print(clean_text('123-456-7890'))
print(clean_text('111-111-1111'))
print(clean_text('(111) 111-1111'))
print(clean_text('A(111) 111-1111'))
print(clean_text('A (111) 111-1111'))
print(clean_text('$2.47'))
print(clean_text('$247'))
print(clean_text('$0.27'))
print(clean_text('$1.00'))
print(clean_text('£20'))
for i in range(1990, 2030):
print(clean_text(str(i)))
print(clean_text('2656'))
print(clean_text('1024'))
print(clean_text('2.47023'))
print(clean_text('20.47023'))
print(clean_text('1.17.1.1'))
print(clean_text('111.111.1111'))
print(clean_text('1/1/2025'))
print(clean_text('1-1-2025'))
print(clean_text('1-1-25'))
print(clean_text('A 1/1/11 A'))
print(clean_text('A 1/1 A'))
print(clean_text('1/1'))
print(clean_text('1/10'))
print(clean_text('1/1/10'))
print(clean_text('11/1/1/10'))
print(clean_text('0:00'))
print(clean_text('12:00'))
print(clean_text('13:00'))
print(clean_text('8:00'))
print(clean_text('8:05'))
print(clean_text('8:15'))
print(clean_text('0:00:00'))
print(clean_text('00:01:10'))
print(clean_text('00:10:01'))
print(clean_text('01:01:01'))
print(clean_text('00:01:00'))
print(clean_text('01:00:00'))
print(clean_text('-1 + 2 * 3 - 4 / 5'))
print(clean_text('-1+2*3-5/4/25'))
print(clean_text('100x1'))
print(clean_text('100k'))
print(clean_text('100m'))
print(clean_text('100b'))
print(clean_text('100t'))
print(clean_text('#1'))
print(clean_text('12:00'))
print(clean_text('11:59'))
print(clean_text('01:00'))
print(clean_text('0100'))
print(clean_text('1st 2nd 3rd 4th'))
print(clean_text('1K 1M 1B 1T 1K1M1B1T'))
print(clean_text('and/or'))

View File

@@ -0,0 +1,76 @@
"""
Copied from https://github.com/neonbjb/tortoise-tts/blob/main/tortoise/utils/text.py
"""
import re
def split_and_recombine_text(text, desired_length=1, max_length=300):
"""Split text it into chunks of a desired length trying to keep sentences intact."""
# normalize text, remove redundant whitespace and convert non-ascii quotes to ascii
text = re.sub(r'\n\n+', '\n', text)
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'[“”]', '"', text)
rv = []
in_quote = False
current = ""
split_pos = []
pos = -1
end_pos = len(text) - 1
def seek(delta):
nonlocal pos, in_quote, current
is_neg = delta < 0
for _ in range(abs(delta)):
if is_neg:
pos -= 1
current = current[:-1]
else:
pos += 1
current += text[pos]
if text[pos] == '"':
in_quote = not in_quote
return text[pos]
def peek(delta):
p = pos + delta
return text[p] if p < end_pos and p >= 0 else ""
def commit():
nonlocal rv, current, split_pos
rv.append(current)
current = ""
split_pos = []
while pos < end_pos:
c = seek(1)
# do we need to force a split?
if len(current) >= max_length:
if len(split_pos) > 0 and len(current) > (desired_length / 2):
# we have at least one sentence and we are over half the desired length, seek back to the last split
d = pos - split_pos[-1]
seek(-d)
else:
# no full sentences, seek back until we are not in the middle of a word and split there
while c not in '!?.\n ' and pos > 0 and len(current) > desired_length:
c = seek(-1)
commit()
# check for sentence boundaries
elif not in_quote and (c in '!?\n' or (c == '.' and peek(1) in '\n ')):
# seek forward if we have consecutive boundary markers but still within the max length
while pos < len(text) - 1 and len(current) < max_length and peek(1) in '!?.':
c = seek(1)
split_pos.append(pos)
if len(current) >= desired_length:
commit()
# treat end of quote as a boundary if its followed by a space or newline
elif in_quote and peek(1) == '"' and peek(2) in '\n ':
seek(2)
split_pos.append(pos)
rv.append(current)
# clean up, remove lines with only whitespace or punctuation
rv = [s.strip() for s in rv]
rv = [s for s in rv if len(s) > 0 and not re.match(r'^[\s\.,;:!?]*$', s)]
return rv

View File

@@ -0,0 +1,45 @@
import torch
from torch import nn
from .models import VocosBackbone
from .heads import ISTFTHead
class SopranoDecoder(nn.Module):
def __init__(self,
num_input_channels=512,
decoder_num_layers=8,
decoder_dim=768,
decoder_intermediate_dim=None,
hop_length=512,
n_fft=2048,
upscale=4,
dw_kernel=3,
):
super().__init__()
self.decoder_initial_channels = num_input_channels
self.num_layers = decoder_num_layers
self.dim = decoder_dim
self.intermediate_dim = decoder_intermediate_dim if decoder_intermediate_dim else decoder_dim*3
self.hop_length = hop_length
self.n_fft = n_fft
self.upscale = upscale
self.dw_kernel = dw_kernel
self.decoder = VocosBackbone(input_channels=self.decoder_initial_channels,
dim=self.dim,
intermediate_dim=self.intermediate_dim,
num_layers=self.num_layers,
input_kernel_size=1,#dw_kernel,
dw_kernel_size=dw_kernel,
)
self.head = ISTFTHead(dim=self.dim,
n_fft=self.n_fft,
hop_length=self.hop_length)
def forward(self, x):
T = x.size(2)
x = torch.nn.functional.interpolate(x, size=self.upscale*(T-1)+1, mode='linear', align_corners=True)
x = self.decoder(x)
reconstructed = self.head(x)
return reconstructed

View File

@@ -0,0 +1,50 @@
import torch
from torch import nn
from .spectral_ops import ISTFT
class ISTFTHead(nn.Module):
"""
ISTFT Head module for predicting STFT complex coefficients.
Args:
dim (int): Hidden dimension of the model.
n_fft (int): Size of Fourier transform.
hop_length (int): The distance between neighboring sliding window frames, which should align with
the resolution of the input features.
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same".
"""
def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "center"):
super().__init__()
out_dim = n_fft + 2
self.out = torch.nn.Linear(dim, out_dim)
self.istft = ISTFT(n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding)
@torch.compiler.disable
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the ISTFTHead module.
Args:
x (Tensor): Input tensor of shape (B, L, H), where B is the batch size,
L is the sequence length, and H denotes the model dimension.
Returns:
Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal.
"""
x = self.out(x.transpose(1,2)).transpose(1, 2)
mag, p = x.chunk(2, dim=1)
mag = torch.exp(mag)
mag = torch.clip(mag, max=1e2) # safeguard to prevent excessively large magnitudes
# wrapping happens here. These two lines produce real and imaginary value
x = torch.cos(p)
y = torch.sin(p)
# recalculating phase here does not produce anything new
# only costs time
# phase = torch.atan2(y, x)
# S = mag * torch.exp(phase * 1j)
# better directly produce the complex value
S = mag * (x + 1j * y)
audio = self.istft(S)
return audio

View File

@@ -0,0 +1,61 @@
from typing import Optional
import torch
from torch import nn
from .modules import ConvNeXtBlock
class VocosBackbone(nn.Module):
"""
Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization
Args:
input_channels (int): Number of input features channels.
dim (int): Hidden dimension of the model.
intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock.
num_layers (int): Number of ConvNeXtBlock layers.
layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`.
"""
def __init__(
self,
input_channels: int,
dim: int,
intermediate_dim: int,
num_layers: int,
input_kernel_size: int = 9,
dw_kernel_size: int = 9,
layer_scale_init_value: Optional[float] = None,
pad: str = 'zeros',
):
super().__init__()
self.embed = nn.Conv1d(input_channels, dim, kernel_size=input_kernel_size, padding=input_kernel_size//2, padding_mode=pad)
self.norm = nn.LayerNorm(dim, eps=1e-6)
self.convnext = nn.ModuleList(
[
ConvNeXtBlock(
dim=dim,
intermediate_dim=intermediate_dim,
dw_kernel_size=dw_kernel_size,
layer_scale_init_value=layer_scale_init_value or 1 / num_layers**0.5,
)
for _ in range(num_layers)
]
)
self.final_layer_norm = nn.LayerNorm(dim, eps=1e-6)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, (nn.Conv1d, nn.Linear)):
nn.init.trunc_normal_(m.weight, std=0.02)
if m.bias is not None: nn.init.constant_(m.bias, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.embed(x) # (B, C, L)
x = self.norm(x.transpose(1, 2))
x = x.transpose(1, 2)
for conv_block in self.convnext:
x = conv_block(x)
x = self.final_layer_norm(x.transpose(1, 2))
x = x.transpose(1, 2)
return x

View File

@@ -0,0 +1,47 @@
import torch
from torch import nn
class ConvNeXtBlock(nn.Module):
"""ConvNeXt Block adapted from https://github.com/facebookresearch/ConvNeXt to 1D audio signal.
Args:
dim (int): Number of input channels.
intermediate_dim (int): Dimensionality of the intermediate layer.
layer_scale_init_value (float, optional): Initial value for the layer scale. None means no scaling.
Defaults to None.
"""
def __init__(
self,
dim: int,
intermediate_dim: int,
layer_scale_init_value: float,
dw_kernel_size: int = 9,
):
super().__init__()
self.dwconv = nn.Conv1d(dim, dim, kernel_size=dw_kernel_size, padding=dw_kernel_size//2, groups=dim) # depthwise conv
self.norm = nn.LayerNorm(dim, eps=1e-6)
self.pwconv1 = nn.Linear(dim, intermediate_dim) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(intermediate_dim, dim)
self.gamma = (
nn.Parameter(layer_scale_init_value * torch.ones(dim), requires_grad=True)
if layer_scale_init_value > 0
else None
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = x
x = self.dwconv(x)
x = x.transpose(1, 2) # (B, C, T) -> (B, T, C)
x = self.norm(x)
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
x = x.transpose(1, 2) # (B, T, C) -> (B, C, T)
x = residual + x
return x

View File

@@ -0,0 +1,74 @@
import torch
from torch import nn
class ISTFT(nn.Module):
"""
Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with
windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges.
See issue: https://github.com/pytorch/pytorch/issues/62323
Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs.
The NOLA constraint is met as we trim padded samples anyway.
Args:
n_fft (int): Size of Fourier transform.
hop_length (int): The distance between neighboring sliding window frames.
win_length (int): The size of window frame and STFT filter.
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same".
"""
def __init__(self, n_fft: int, hop_length: int, win_length: int, padding: str = "same"):
super().__init__()
if padding not in ["center", "same"]:
raise ValueError("Padding must be 'center' or 'same'.")
self.padding = padding
self.n_fft = n_fft
self.hop_length = hop_length
self.win_length = win_length
window = torch.hann_window(win_length)
self.register_buffer("window", window)
def forward(self, spec: torch.Tensor) -> torch.Tensor:
"""
Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram.
Args:
spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size,
N is the number of frequency bins, and T is the number of time frames.
Returns:
Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal.
"""
if self.padding == "center":
spec[:,0] = 0 # fixes some strange bug where first/last freqs don't matter when bs<16 which causes exploding gradients
spec[:,-1] = 0
# Fallback to pytorch native implementation
return torch.istft(spec, self.n_fft, self.hop_length, self.win_length, self.window, center=True)
elif self.padding == "same":
pad = (self.win_length - self.hop_length) // 2
else:
raise ValueError("Padding must be 'center' or 'same'.")
assert spec.dim() == 3, "Expected a 3D tensor as input"
B, N, T = spec.shape
# Inverse FFT
ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward")
ifft = ifft * self.window[None, :, None]
# Overlap and Add
output_size = (T - 1) * self.hop_length + self.win_length
y = torch.nn.functional.fold(
ifft, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length),
)[:, 0, 0, pad:-pad]
# Window envelope
window_sq = self.window.square().expand(1, T, -1).transpose(1, 2)
window_envelope = torch.nn.functional.fold(
window_sq, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length),
).squeeze()[pad:-pad]
# Normalize
assert (window_envelope > 1e-11).all()
y = y / window_envelope
return y

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""
Gradio Web Interface for Soprano TTS
"""
import argparse
import socket
import time
import gradio as gr
import numpy as np
from soprano import SopranoTTS
from soprano.utils.streaming import play_stream
parser = argparse.ArgumentParser(description='Soprano Text-to-Speech Gradio WebUI')
parser.add_argument('--model-path', '-m',
help='Path to local model directory (optional)')
parser.add_argument('--device', '-d', default='auto',
choices=['auto', 'cuda', 'cpu', 'mps'],
help='Device to use for inference')
parser.add_argument('--backend', '-b', default='auto',
choices=['auto', 'transformers', 'lmdeploy'],
help='Backend to use for inference')
parser.add_argument('--cache-size', '-c', type=int, default=100,
help='Cache size in MB (for lmdeploy backend)')
parser.add_argument('--decoder-batch-size', '-bs', type=int, default=1,
help='Batch size when decoding audio')
args = parser.parse_args()
# Initialize model
print("Loading Soprano TTS model...")
model = SopranoTTS(
backend=args.backend,
device=args.device,
cache_size_mb=args.cache_size,
decoder_batch_size=args.decoder_batch_size,
model_path=args.model_path
)
device = model.device
backend = model.backend
print("Model loaded successfully!")
SAMPLE_RATE = 32000
def generate_speech(
text: str,
temperature: float,
top_p: float,
repetition_penalty: float,
chunk_size: int,
streaming: bool,
):
if not text.strip():
yield None, "Please enter some text to generate speech."
return
try:
if streaming:
stream = model.infer_stream(
text,
temperature=temperature,
top_p=top_p,
repetition_penalty=repetition_penalty,
chunk_size=chunk_size,
)
yield None, "⏳ Streaming..."
latency = play_stream(stream)
yield None, (
f"✓ Streaming complete | "
f"{latency*1000:.2f} ms latency"
)
return
start_time = time.perf_counter()
audio = model.infer(
text,
temperature=temperature,
top_p=top_p,
repetition_penalty=repetition_penalty,
)
gen_time = time.perf_counter() - start_time
audio_np = audio.cpu().numpy()
audio_int16 = (audio_np * 32767).astype(np.int16)
audio_seconds = len(audio_np) / SAMPLE_RATE
rtf = audio_seconds / gen_time if gen_time > 0 else float("inf")
status = (
f"✓ Generated {audio_seconds:.2f} s audio | "
f"Generation time: {gen_time:.3f} s "
f"({rtf:.2f}x realtime)"
)
yield (SAMPLE_RATE, audio_int16), status
return
except Exception as e:
yield None, f"✗ Error: {str(e)}"
# Create Gradio interface
with gr.Blocks(title="Soprano TTS") as demo:
gr.Markdown(
f"""# 🗣️ Soprano TTS
<div align="center">
<img width="300" height="300" alt="soprano-github" src="https://github.com/user-attachments/assets/4d612eac-23b8-44e6-8c59-d7ac14ebafd1" />
</div>
**Device:** {device.upper()} | **Backend:** {backend}
**Model Weights:** https://huggingface.co/ekwek/Soprano-1.1-80M
**Model Demo:** https://huggingface.co/spaces/ekwek/Soprano-TTS
**GitHub:** https://github.com/ekwek1/soprano
"""
)
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="Text to Synthesize",
placeholder="Enter text here...",
value="Soprano is an extremely lightweight text to speech model designed to produce highly realistic speech at unprecedented speed.",
lines=5,
max_lines=10,
)
streaming = gr.Checkbox(
label="Stream Audio",
value=False,
info="Note: This bypasses the Gradio interface and streams audio directly to your speaker."
)
with gr.Accordion("Advanced Settings", open=False):
temperature = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.0,
step=0.05,
label="Temperature",
)
top_p = gr.Slider(
minimum=0.5,
maximum=1.0,
value=0.95,
step=0.05,
label="Top P",
)
repetition_penalty = gr.Slider(
minimum=1.0,
maximum=2.0,
value=1.2,
step=0.1,
label="Repetition Penalty",
)
chunk_size = gr.Slider(
minimum=1,
maximum=10,
value=1,
step=1,
precision=0,
label="Chunk Size (Streaming only)",
)
generate_btn = gr.Button("Generate Speech", variant="primary", size="lg")
with gr.Column(scale=1):
audio_output = gr.Audio(
label="Generated Speech",
type="numpy",
autoplay=True,
)
status_output = gr.Textbox(
label="Status",
interactive=False,
lines=3,
max_lines=10
)
gr.Examples(
examples=[
["Soprano is an extremely lightweight text to speech model.", 0.0, 0.95, 1.2],
["Artificial intelligence is transforming the world.", 0.0, 0.95, 1.2],
["I'm so excited, I can't even wait!", 0.0, 0.95, 1.2],
["Why don't you go ahead and try it?", 0.0, 0.95, 1.2],
],
inputs=[text_input, temperature, top_p, repetition_penalty],
label="Example Prompts",
)
generate_btn.click(
fn=generate_speech,
inputs=[text_input, temperature, top_p, repetition_penalty, chunk_size, streaming],
outputs=[audio_output, status_output],
)
gr.Markdown(
f"""
### Usage tips:
- Soprano works best when each sentence is between 2 and 30 seconds long.
- Although Soprano recognizes numbers and some special characters, it occasionally mispronounces them.
Best results can be achieved by converting these into their phonetic form.
(1+1 -> one plus one, etc)
- If Soprano produces unsatisfactory results, you can easily regenerate it for a new, potentially better generation.
You may also change the sampling settings for more varied results.
- Avoid improper grammar such as not using contractions, multiple spaces, etc.
"""
)
def find_free_port(start_port=7860, max_tries=100):
for port in range(start_port, start_port + max_tries):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", port))
return port
except OSError:
continue
raise OSError("Could not find a free port")
def main():
# Start Gradio interface
port = find_free_port(7860)
print(f"Starting Gradio interface on port {port}")
demo.launch(
server_name="0.0.0.0",
server_port=port,
share=False,
theme=gr.themes.Soft(primary_hue="green"),
css="""
a {
color: var(--primary-600);
}
a:hover {
color: var(--primary-700);
}
"""
)
if __name__ == "__main__":
main()