From a3a3c49c28be31f2b32a31ae6d6c7499bb279d28 Mon Sep 17 00:00:00 2001 From: lelo Date: Tue, 25 Mar 2025 22:07:49 +0100 Subject: [PATCH] add audio segmenter --- audioSegmenter.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 audioSegmenter.py diff --git a/audioSegmenter.py b/audioSegmenter.py new file mode 100644 index 0000000..9150117 --- /dev/null +++ b/audioSegmenter.py @@ -0,0 +1,139 @@ +import os +import sys +import torch +import numpy as np +from pydub import AudioSegment +import shutil + +def load_silero_vad(): + """ + Loads the Silero VAD model and returns it along with the helper function. + (Assumes the VAD output values are in sample indices.) + """ + model, utils = torch.hub.load('snakers4/silero-vad', 'silero_vad', force_reload=False) + get_speech_ts = utils[0] + return model, get_speech_ts + +def convert_audio_for_vad(audio): + """ + Converts a pydub AudioSegment (assumed mono, 16 kHz) to a torch tensor. + The samples are normalized to [-1, 1]. + """ + samples = np.array(audio.get_array_of_samples()) + samples = samples.astype(np.float32) / (2 ** (8 * audio.sample_width - 1)) + return torch.from_numpy(samples) + +def split_audio(file_path, threshold=0.5, min_speech_duration_ms=90000, min_silence_duration_ms=4000): + """ + Loads an MP3 file, runs Silero VAD to get reference split points, and splits + the audio into chunks covering the entire file without trimming any audio. + + The boundaries are defined as: + [0, VAD_end_boundaries (converted to ms), true_duration_ms] + + Each chunk is exported as an MP3 file in an output folder. + """ + print(f"\nProcessing file: {file_path}") + + try: + audio = AudioSegment.from_mp3(file_path) + except Exception as e: + print(f" Error loading {file_path} with pydub: {e}") + return + + # Report pydub duration. + pydub_duration_ms = len(audio) + print(" Pydub-reported duration (ms):", pydub_duration_ms) + + # Create a version for VAD: mono, 16 kHz. + audio_for_vad = audio.set_channels(1).set_frame_rate(16000) + # Compute true duration from sample count. + true_duration_ms = int((len(audio_for_vad.get_array_of_samples()) / 16000) * 1000) + print(" Computed true duration (ms):", true_duration_ms) + + # Convert to tensor. + audio_tensor = convert_audio_for_vad(audio_for_vad) + device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") + if torch.cuda.is_available(): + audio_tensor = audio_tensor.to(device) + + MODEL, GET_SPEECH_TS = load_silero_vad() + if torch.cuda.is_available(): + MODEL = MODEL.to(device) + + try: + # Run VAD. We assume the returned timestamps are in sample indices. + vad_output = GET_SPEECH_TS( + audio_tensor, MODEL, sampling_rate=16000, + threshold=threshold, + min_speech_duration_ms=min_speech_duration_ms, + min_silence_duration_ms=min_silence_duration_ms + ) + except Exception as e: + print(f" Error running Silero VAD on {file_path}: {e}") + return + + print(" Raw VAD output:", vad_output) + # Convert the VAD "end" values from samples to milliseconds. + candidate_boundaries = [int(seg["end"] / 16000 * 1000) for seg in vad_output] + # Keep only those boundaries that are within the true duration. + candidate_boundaries = [bp for bp in candidate_boundaries if 0 < bp < true_duration_ms] + candidate_boundaries = sorted(set(candidate_boundaries)) + print(" Candidate split boundaries (ms) from VAD:", candidate_boundaries) + + # Define final boundaries: start, then candidate boundaries, then true duration. + boundaries = [0] + candidate_boundaries + [true_duration_ms] + boundaries = sorted(set(boundaries)) + print(" Final split boundaries (ms):", boundaries) + + # Determine output folder: append "_geteilt" to the folder name. + file_dir = os.path.dirname(file_path) + base_name = os.path.basename(file_dir) + if starting_directory and os.path.abspath(file_dir) != os.path.abspath(starting_directory): + parent_of_file_dir = os.path.dirname(file_dir) + output_dir = os.path.join(parent_of_file_dir, base_name + "_geteilt") + else: + output_dir = os.path.join(file_dir, base_name + "_geteilt") + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + exported = 0 + for i in range(len(boundaries) - 1): + start_ms = boundaries[i] + end_ms = boundaries[i+1] + if end_ms <= start_ms: + continue + # Do not trim anything: use the full audio from start_ms to end_ms. + chunk = audio[start_ms:end_ms] + out_file = os.path.join(output_dir, f"Teil{exported+1:02d}.mp3") + print(f" Exporting chunk {exported+1}: {start_ms} ms to {end_ms} ms as {out_file}") + try: + chunk.export(out_file, format="mp3", codec="libmp3lame", bitrate="192k") + exported += 1 + except Exception as e: + print(f" Error exporting chunk {exported+1}: {e}") + + print(f"Finished processing file: {file_path} (exported {exported} chunks)") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python audioSegmenter.py ") + sys.exit(1) + + starting_directory = sys.argv[1] + if not os.path.isdir(starting_directory): + print(f"Error: The directory '{starting_directory}' does not exist.") + sys.exit(1) + + # Recursively gather all MP3 files (skip folders ending with "_geteilt"). + mp3_files = [] + for root, _, files in os.walk(starting_directory): + if root.endswith("_geteilt"): + continue + for file in files: + if file.lower().endswith('.mp3'): + mp3_files.append(os.path.join(root, file)) + + print("Total MP3 files found:", len(mp3_files)) + for file_path in mp3_files: + split_audio(file_path)