import os import sys import time import torch import whisper import concurrent.futures import json import re import yaml import librosa import numpy as np # start time for transcription statistics start_time = 0 total_audio_length = 0 with open("transcription_config.yml", "r", encoding="utf-8") as file: settings = yaml.safe_load(file) folder_list = settings.get("folder_list") model_name = settings.get("model_name") gpu_only = settings.get("gpu_only", False) print("PyTorch version:", torch.__version__) print("CUDA available?", torch.cuda.is_available()) print("CUDA version:", torch.version.cuda) print("GPU count:", torch.cuda.device_count()) if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): print(f" Device {i}:", torch.cuda.get_device_name(i)) if not folder_list or not model_name: print("Error: Please check the transcription_config.yml file. It should contain 'folder_list' and 'model_name'.") sys.exit(1) if gpu_only and not torch.cuda.is_available(): print("Error: You requested to only use GPU but it is not available. Please check your PyTorch installation.") sys.exit(1) def load_audio_librosa(path: str, sr: int = 16_000) -> np.ndarray: audio, orig_sr = librosa.load(path, sr=sr) # load + resample to 16 kHz return audio def format_timestamp(seconds): """Format seconds into HH:MM:SS.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours == 0: return f"{minutes:02}:{secs:02}" else: return f"{hours:02}:{minutes:02}:{secs:02}" def format_status_path(path): """Return a string with only the immediate parent folder and the filename.""" filename = os.path.basename(path) parent = os.path.basename(os.path.dirname(path)) if parent: return os.path.join(parent, filename) return filename def remove_lines_with_words(transcript): """Removes the last line from the transcript if any banned word is found in it.""" # Define banned words banned_words = ["copyright", "ard", "zdf", "wdr"] # Split transcript into lines lines = transcript.rstrip().splitlines() if not lines: return transcript # Return unchanged if transcript is empty # Check the last line last_line = lines[-1] if any(banned_word.lower() in last_line.lower() for banned_word in banned_words): # Remove the last line if any banned word is present lines = lines[:-1] return "\n".join(lines) def apply_error_correction(text): # Load the JSON file that contains your error_correction with open('error_correction.json', 'r', encoding='utf-8') as file: correction_dict = json.load(file) # Combine keys into a single regex pattern pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b' def replacement_func(match): key = match.group(0) return correction_dict.get(key, key) return re.sub(pattern, replacement_func, text) def print_speed(current_length): global start_time global total_audio_length # Calculate transcription time statistics elapsed_time = time.time() - start_time total_audio_length = total_audio_length + current_length # Calculate transcription speed: minutes of audio transcribed per hour of processing. # Formula: (audio duration in minutes) / (elapsed time in hours) if elapsed_time > 0: trans_speed = (total_audio_length / 60) / (elapsed_time / 3600) else: trans_speed = 0 print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True) def transcribe_file(model, audio_input, language): initial_prompt = ( "Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, " "das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. " "Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: " "Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, " "Apostelgeschichte, Auferstehung, Wiedergeburt. " "Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. " "Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. " "Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. " "Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden." ) result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language) return result def detect_language(model, audio): print(" Language detected: ", end='', flush=True) audio = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device) _, probs = model.detect_language(mel) lang_code = max(probs, key=probs.get) print(f"{lang_code}. ", end='', flush=True) return lang_code def process_file(file_path, model, audio_input): """ Transcribe the audio file into one markdown file. If special case (German sermon in Russian or Russian-marked file), transcribe both in Russian and German into the same file. """ file_name = os.path.basename(file_path) # Detect spoken language detected = detect_language(model, audio_input) # Determine which languages to transcribe if (detected == 'ru' and 'predigt' in file_name.lower()) or \ (detected == 'de' and 'russisch' in file_name.lower()): langs = ['ru', 'de'] elif detected == 'en': # songs often mis-detected as English langs = ['de'] elif detected in ('de', 'ru'): langs = [detected] else: langs = ['ru'] # Collect segments for combined result combined_segments = [] for lang in langs: print(f"Transcribing {format_status_path(file_path)} as {lang}", end='', flush=True) result = transcribe_file(model, audio_input, lang) # Insert a synthetic segment for language header combined_segments.append({ 'start': 0, 'text': f"\n## Transcription ({lang.upper()})", }) # Extend with actual segments if isinstance(result, dict) and 'segments' in result: combined_segments.extend(result['segments']) else: # If result isn't dict-of-segments, wrap entire text text = getattr(result, 'text', None) or (result.get('text') if isinstance(result, dict) else str(result)) combined_segments.append({'start': 0, 'text': text}) # Now write out markdown using the combined segments file_dir = os.path.dirname(file_path) txt_folder = os.path.join(file_dir, "Transkription") os.makedirs(txt_folder, exist_ok=True) base_name = os.path.splitext(os.path.basename(file_path))[0] output_md = os.path.join(txt_folder, base_name + ".md") # Build markdown lines folder_name = os.path.basename(file_dir) md_lines = [ f"### {folder_name}", f"#### {os.path.basename(file_path)}", "---", "" ] previous_text = "" for segment in combined_segments: start = format_timestamp(segment.get('start', 0)) text = segment.get('text', '').strip() if text and text != previous_text: md_lines.append(f"`{start}` {text}") previous_text = text # Join and post-process transcript_md = "\n".join(md_lines) transcript_md = apply_error_correction(transcript_md) transcript_md = remove_lines_with_words(transcript_md) # Write file and report with open(output_md, "w", encoding="utf-8") as f: f.write(transcript_md) if combined_segments: end_ts = combined_segments[-1].get('end', combined_segments[-1].get('start', 0)) print_speed(end_ts) print("... done !") def process_folder(root_folder): """ Walk through root_folder and process .mp3 files, applying skip rules. Only files that need to be transcribed (i.e. transcription does not already exist) will have their audio pre-loaded concurrently. """ global start_time keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"] print("Create file list...") valid_files = [] checked_files = 0 # Walk the folder and build a list of files to transcribe. for dirpath, _, filenames in os.walk(root_folder): for filename in filenames: if filename.lower().endswith(".mp3"): checked_files = checked_files + 1 filename_lower = filename.lower() file_path = os.path.join(dirpath, filename) # Skip files with skip keywords. if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords): continue # Compute expected output markdown path. txt_folder = os.path.join(dirpath, "Transkription") base_name = os.path.splitext(os.path.basename(file_path))[0] output_md = os.path.join(txt_folder, base_name + ".md") # skip files with existing md files if os.path.exists(output_md): continue valid_files.append(file_path) if len(valid_files) == 0: print(f"Checked {checked_files} files. All files are transcribed.") return else: print(f"Checked {checked_files} files. Start to transcribe {len(valid_files)} files.") # Choose “cuda” if available, otherwise “cpu” device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Loading Whisper model on {device}…") model = whisper.load_model(model_name, device=device) # Use a thread pool to pre-load files concurrently. with concurrent.futures.ThreadPoolExecutor() as executor: # Pre-load the first file. print("Initialize preloading process...") future_audio = executor.submit(load_audio_librosa, valid_files[0]) # Wait for the first file to be loaded. preloaded_audio = future_audio.result() # Record start time for transcription statistics start_time = time.time() for i, file_path in enumerate(valid_files): preloaded_audio = future_audio.result() # Start loading the next file concurrently. if i + 1 < len(valid_files): future_audio = executor.submit(load_audio_librosa, valid_files[i + 1]) try: # continue with next file if a file fails process_file(file_path, model, preloaded_audio) except Exception as e: print(f"Error with file {file_path}") print(e) if __name__ == "__main__": for folder in folder_list: process_folder(folder) print("All done!")