import os import sys import time import torch import whisper import concurrent.futures import json import re import yaml import librosa import numpy as np # start time for transcription statistics start_time = 0 total_audio_length = 0 with open("transcription_config.yml", "r", encoding="utf-8") as file: settings = yaml.safe_load(file) folder_list = settings.get("folder_list") model_name = settings.get("model_name") gpu_only = settings.get("gpu_only", False) print("PyTorch version:", torch.__version__) print("CUDA available?", torch.cuda.is_available()) print("CUDA version:", torch.version.cuda) print("GPU count:", torch.cuda.device_count()) if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): print(f" Device {i}:", torch.cuda.get_device_name(i)) if not folder_list or not model_name: print("Error: Please check the transcription_config.yml file. It should contain 'folder_list' and 'model_name'.") sys.exit(1) if gpu_only and not torch.cuda.is_available(): print("Error: You requested to only use GPU but it is not available. Please check your PyTorch installation.") sys.exit(1) def load_audio_librosa(path: str, sr: int = 16_000) -> np.ndarray: audio, orig_sr = librosa.load(path, sr=sr) # load + resample to 16 kHz return audio def format_timestamp(seconds): """Format seconds into HH:MM:SS.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours == 0: return f"{minutes:02}:{secs:02}" else: return f"{hours:02}:{minutes:02}:{secs:02}" def format_status_path(path): """Return a string with only the immediate parent folder and the filename.""" filename = os.path.basename(path) parent = os.path.basename(os.path.dirname(path)) if parent: return os.path.join(parent, filename) return filename def remove_lines_with_words(transcript): """Removes the last line from the transcript if any banned word is found in it.""" # Define banned words banned_words = ["copyright", "ard", "zdf", "wdr"] # Split transcript into lines lines = transcript.rstrip().splitlines() if not lines: return transcript # Return unchanged if transcript is empty # Check the last line last_line = lines[-1] if any(banned_word.lower() in last_line.lower() for banned_word in banned_words): # Remove the last line if any banned word is present lines = lines[:-1] return "\n".join(lines) def apply_error_correction(text): # Load the JSON file that contains your error_correction with open('error_correction.json', 'r', encoding='utf-8') as file: correction_dict = json.load(file) # Combine keys into a single regex pattern pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b' def replacement_func(match): key = match.group(0) return correction_dict.get(key, key) return re.sub(pattern, replacement_func, text) def print_speed(current_length): global start_time global total_audio_length # Calculate transcription time statistics elapsed_time = time.time() - start_time total_audio_length = total_audio_length + current_length # Calculate transcription speed: minutes of audio transcribed per hour of processing. # Formula: (audio duration in minutes) / (elapsed time in hours) if elapsed_time > 0: trans_speed = (total_audio_length / 60) / (elapsed_time / 3600) else: trans_speed = 0 print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True) def transcribe_file(model, audio_input, language): initial_prompt = ( "Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, " "das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. " "Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: " "Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, " "Apostelgeschichte, Auferstehung, Wiedergeburt. " "Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. " "Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. " "Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. " "Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden." ) result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language) return result def detect_language(model, audio): print(" Language detected: ", end='', flush=True) audio = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device) _, probs = model.detect_language(mel) lang_code = max(probs, key=probs.get) print(f"{lang_code}. ", end='', flush=True) return lang_code def process_file(file_path, model, audio_input): """ Transcribe the audio file into one markdown file. If special case (German sermon in Russian or Russian-marked file), transcribe both in Russian and German into the same file. """ file_name = os.path.basename(file_path) # Detect spoken language detected = detect_language(model, audio_input) # Determine which languages to transcribe if (detected == 'ru' and 'predigt' in file_name.lower()) or \ (detected == 'de' and 'russisch' in file_name.lower()): langs = ['de', 'ru'] elif detected == 'en': # songs often mis-detected as English langs = ['de'] elif detected in ('de', 'ru'): langs = [detected] else: langs = ['ru'] # Collect segments for combined result lang_collection = {} for lang in langs: combined_segments = [] print(f"Transcribing {format_status_path(file_path)} as {lang}", end='', flush=True) result = transcribe_file(model, audio_input, lang) # Extend with actual segments if isinstance(result, dict) and 'segments' in result: combined_segments.extend(result['segments']) else: # If result isn't dict-of-segments, wrap entire text text = getattr(result, 'text', None) or (result.get('text') if isinstance(result, dict) else str(result)) combined_segments.append({'start': 0, 'text': text}) lang_collection[lang] = combined_segments # Now write out markdown using the combined segments file_dir = os.path.dirname(file_path) txt_folder = os.path.join(file_dir, "Transkription") os.makedirs(txt_folder, exist_ok=True) base_name = os.path.splitext(os.path.basename(file_path))[0] output_md = os.path.join(txt_folder, base_name + ".md") # Build markdown lines folder_name = os.path.basename(file_dir) md_lines = [ f"### {folder_name}", f"#### {os.path.basename(file_path)}", "---", "" ] previous_text = "" for lang, combined_segments in lang_collection.items(): md_lines.append(f"##### Transcription ({lang.upper()})") md_lines.append("---") for segment in combined_segments: start = format_timestamp(segment.get('start', 0)) text = segment.get('text', '').strip() if text and text != previous_text: md_lines.append(f"`{start}` {text}") previous_text = text # Join and post-process transcript_md = "\n".join(md_lines) transcript_md = apply_error_correction(transcript_md) transcript_md = remove_lines_with_words(transcript_md) # Write file and report with open(output_md, "w", encoding="utf-8") as f: f.write(transcript_md) if combined_segments: end_ts = combined_segments[-1].get('end', combined_segments[-1].get('start', 0)) print_speed(end_ts) print("... done !") def process_folder(root_folder): """ Walk through root_folder and process .mp3 files. Differentiates between “folder not found” and “folder empty.” Select files that need to be transcribed (i.e. transcription does not already exist, applying certain rules) """ global start_time keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"] print("Create file list...") # path actually exist / can we even try to list it? if not os.path.exists(root_folder): print(f"Error: Path '{root_folder}' does not exist or is not reachable.") return if not os.path.isdir(root_folder): print(f"Error: Path '{root_folder}' exists but is not a folder.") return # Now we know the folder exists; let's scan it. print(f"Scanning '{root_folder}' for .mp3 files…") valid_files = [] checked_files = 0 for dirpath, _, filenames in os.walk(root_folder): for filename in filenames: if filename.lower().endswith(".mp3"): checked_files += 1 filename_lower = filename.lower() file_path = os.path.join(dirpath, filename) # Skip files with skip keywords. if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords): continue # Compute expected output markdown path. txt_folder = os.path.join(dirpath, "Transkription") base_name = os.path.splitext(os.path.basename(file_path))[0] output_md = os.path.join(txt_folder, base_name + ".md") # skip files with existing md files if os.path.exists(output_md): continue valid_files.append(file_path) # If the folder was empty of .mp3s, checked_files will be 0, # but we know it existed because we passed the exists()/isdir() tests. if checked_files == 0: print(f"Checked 0 files in '{root_folder}'. Folder is empty of .mp3s.") return # If you made it here, checked_files > 0 but maybe all were already transcribed: if len(valid_files) == 0: print(f"Checked {checked_files} files. All files are already transcribed.") return # Otherwise you have files to process… print(f"Checked {checked_files} files. {len(valid_files)} need transcription.") # Choose “cuda” if available, otherwise “cpu” device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Loading Whisper model on {device}…") model = whisper.load_model(model_name, device=device) # Use a thread pool to pre-load files concurrently. with concurrent.futures.ThreadPoolExecutor() as executor: # Pre-load the first file. print("Initialize preloading process...") future_audio = executor.submit(load_audio_librosa, valid_files[0]) # Wait for the first file to be loaded. preloaded_audio = future_audio.result() # Record start time for transcription statistics start_time = time.time() for i, file_path in enumerate(valid_files): preloaded_audio = future_audio.result() # Start loading the next file concurrently. if i + 1 < len(valid_files): future_audio = executor.submit(load_audio_librosa, valid_files[i + 1]) try: # continue with next file if a file fails process_file(file_path, model, preloaded_audio) except Exception as e: print(f"Error with file {file_path}") print(e) if __name__ == "__main__": for folder in folder_list: process_folder(folder) print("All done!")