140 lines
5.4 KiB
Python
140 lines
5.4 KiB
Python
import os
|
|
import sys
|
|
import torch
|
|
import numpy as np
|
|
from pydub import AudioSegment
|
|
import shutil
|
|
|
|
def load_silero_vad():
|
|
"""
|
|
Loads the Silero VAD model and returns it along with the helper function.
|
|
(Assumes the VAD output values are in sample indices.)
|
|
"""
|
|
model, utils = torch.hub.load('snakers4/silero-vad', 'silero_vad', force_reload=False)
|
|
get_speech_ts = utils[0]
|
|
return model, get_speech_ts
|
|
|
|
def convert_audio_for_vad(audio):
|
|
"""
|
|
Converts a pydub AudioSegment (assumed mono, 16 kHz) to a torch tensor.
|
|
The samples are normalized to [-1, 1].
|
|
"""
|
|
samples = np.array(audio.get_array_of_samples())
|
|
samples = samples.astype(np.float32) / (2 ** (8 * audio.sample_width - 1))
|
|
return torch.from_numpy(samples)
|
|
|
|
def split_audio(file_path, threshold=0.5, min_speech_duration_ms=90000, min_silence_duration_ms=4000):
|
|
"""
|
|
Loads an MP3 file, runs Silero VAD to get reference split points, and splits
|
|
the audio into chunks covering the entire file without trimming any audio.
|
|
|
|
The boundaries are defined as:
|
|
[0, VAD_end_boundaries (converted to ms), true_duration_ms]
|
|
|
|
Each chunk is exported as an MP3 file in an output folder.
|
|
"""
|
|
print(f"\nProcessing file: {file_path}")
|
|
|
|
try:
|
|
audio = AudioSegment.from_mp3(file_path)
|
|
except Exception as e:
|
|
print(f" Error loading {file_path} with pydub: {e}")
|
|
return
|
|
|
|
# Report pydub duration.
|
|
pydub_duration_ms = len(audio)
|
|
print(" Pydub-reported duration (ms):", pydub_duration_ms)
|
|
|
|
# Create a version for VAD: mono, 16 kHz.
|
|
audio_for_vad = audio.set_channels(1).set_frame_rate(16000)
|
|
# Compute true duration from sample count.
|
|
true_duration_ms = int((len(audio_for_vad.get_array_of_samples()) / 16000) * 1000)
|
|
print(" Computed true duration (ms):", true_duration_ms)
|
|
|
|
# Convert to tensor.
|
|
audio_tensor = convert_audio_for_vad(audio_for_vad)
|
|
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
|
|
if torch.cuda.is_available():
|
|
audio_tensor = audio_tensor.to(device)
|
|
|
|
MODEL, GET_SPEECH_TS = load_silero_vad()
|
|
if torch.cuda.is_available():
|
|
MODEL = MODEL.to(device)
|
|
|
|
try:
|
|
# Run VAD. We assume the returned timestamps are in sample indices.
|
|
vad_output = GET_SPEECH_TS(
|
|
audio_tensor, MODEL, sampling_rate=16000,
|
|
threshold=threshold,
|
|
min_speech_duration_ms=min_speech_duration_ms,
|
|
min_silence_duration_ms=min_silence_duration_ms
|
|
)
|
|
except Exception as e:
|
|
print(f" Error running Silero VAD on {file_path}: {e}")
|
|
return
|
|
|
|
print(" Raw VAD output:", vad_output)
|
|
# Convert the VAD "end" values from samples to milliseconds.
|
|
candidate_boundaries = [int(seg["end"] / 16000 * 1000) for seg in vad_output]
|
|
# Keep only those boundaries that are within the true duration.
|
|
candidate_boundaries = [bp for bp in candidate_boundaries if 0 < bp < true_duration_ms]
|
|
candidate_boundaries = sorted(set(candidate_boundaries))
|
|
print(" Candidate split boundaries (ms) from VAD:", candidate_boundaries)
|
|
|
|
# Define final boundaries: start, then candidate boundaries, then true duration.
|
|
boundaries = [0] + candidate_boundaries + [true_duration_ms]
|
|
boundaries = sorted(set(boundaries))
|
|
print(" Final split boundaries (ms):", boundaries)
|
|
|
|
# Determine output folder: append "_geteilt" to the folder name.
|
|
file_dir = os.path.dirname(file_path)
|
|
base_name = os.path.basename(file_dir)
|
|
if starting_directory and os.path.abspath(file_dir) != os.path.abspath(starting_directory):
|
|
parent_of_file_dir = os.path.dirname(file_dir)
|
|
output_dir = os.path.join(parent_of_file_dir, base_name + "_geteilt")
|
|
else:
|
|
output_dir = os.path.join(file_dir, base_name + "_geteilt")
|
|
if not os.path.exists(output_dir):
|
|
os.makedirs(output_dir)
|
|
|
|
exported = 0
|
|
for i in range(len(boundaries) - 1):
|
|
start_ms = boundaries[i]
|
|
end_ms = boundaries[i+1]
|
|
if end_ms <= start_ms:
|
|
continue
|
|
# Do not trim anything: use the full audio from start_ms to end_ms.
|
|
chunk = audio[start_ms:end_ms]
|
|
out_file = os.path.join(output_dir, f"Teil{exported+1:02d}.mp3")
|
|
print(f" Exporting chunk {exported+1}: {start_ms} ms to {end_ms} ms as {out_file}")
|
|
try:
|
|
chunk.export(out_file, format="mp3", codec="libmp3lame", bitrate="192k")
|
|
exported += 1
|
|
except Exception as e:
|
|
print(f" Error exporting chunk {exported+1}: {e}")
|
|
|
|
print(f"Finished processing file: {file_path} (exported {exported} chunks)")
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python audioSegmenter.py <starting_directory>")
|
|
sys.exit(1)
|
|
|
|
starting_directory = sys.argv[1]
|
|
if not os.path.isdir(starting_directory):
|
|
print(f"Error: The directory '{starting_directory}' does not exist.")
|
|
sys.exit(1)
|
|
|
|
# Recursively gather all MP3 files (skip folders ending with "_geteilt").
|
|
mp3_files = []
|
|
for root, _, files in os.walk(starting_directory):
|
|
if root.endswith("_geteilt"):
|
|
continue
|
|
for file in files:
|
|
if file.lower().endswith('.mp3'):
|
|
mp3_files.append(os.path.join(root, file))
|
|
|
|
print("Total MP3 files found:", len(mp3_files))
|
|
for file_path in mp3_files:
|
|
split_audio(file_path)
|