import os import sys import time import whisper import concurrent.futures import json import re # model_name = "large-v3" model_name = "medium" # start time for transcription statistics start_time = 0 total_audio_length = 0 folder_list = [ # Speyer # "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2025", # "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2016", # "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2015", # "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2014", # Schwegenheim "\\\\10.1.1.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2025", "\\\\10.1.1.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2024" ] def format_timestamp(seconds): """Format seconds into HH:MM:SS.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours == 0: return f"{minutes:02}:{secs:02}" else: return f"{hours:02}:{minutes:02}:{secs:02}" def format_status_path(path): """Return a string with only the immediate parent folder and the filename.""" filename = os.path.basename(path) parent = os.path.basename(os.path.dirname(path)) if parent: return os.path.join(parent, filename) return filename def remove_lines_with_words(transcript): """Removes the last line from the transcript if any banned word is found in it.""" # Define banned words banned_words = ["copyright", "ard", "zdf", "wdr"] # Split transcript into lines lines = transcript.rstrip().splitlines() if not lines: return transcript # Return unchanged if transcript is empty # Check the last line last_line = lines[-1] if any(banned_word.lower() in last_line.lower() for banned_word in banned_words): # Remove the last line if any banned word is present lines = lines[:-1] return "\n".join(lines) def apply_error_correction(text): # Load the JSON file that contains your error_correction with open('error_correction.json', 'r', encoding='utf-8') as file: correction_dict = json.load(file) # Combine keys into a single regex pattern pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b' def replacement_func(match): key = match.group(0) return correction_dict.get(key, key) return re.sub(pattern, replacement_func, text) def print_speed(current_length): global start_time global total_audio_length # Calculate transcription time statistics elapsed_time = time.time() - start_time total_audio_length = total_audio_length + current_length # Calculate transcription speed: minutes of audio transcribed per hour of processing. # Formula: (audio duration in minutes) / (elapsed time in hours) if elapsed_time > 0: trans_speed = (total_audio_length / 60) / (elapsed_time / 3600) else: trans_speed = 0 print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True) def write_markdown(file_path, result, postfix=None): file_dir = os.path.dirname(file_path) txt_folder = os.path.join(file_dir, "Transkription") os.makedirs(txt_folder, exist_ok=True) base_name = os.path.splitext(os.path.basename(file_path))[0] if postfix != None: base_name = f"{base_name}_{postfix}" output_md = os.path.join(txt_folder, base_name + ".md") # Prepare the markdown content. folder_name = os.path.basename(file_dir) md_lines = [ f"### {folder_name}", f"#### {os.path.basename(file_path)}", "---", "" ] previous_text = "" for segment in result["segments"]: start = format_timestamp(segment["start"]) text = segment["text"].strip() if previous_text != text: # suppress repeating lines md_lines.append(f"`{start}` {text}") previous_text = text transcript_md = "\n".join(md_lines) transcript_md = apply_error_correction(transcript_md) transcript_md = remove_lines_with_words(transcript_md) with open(output_md, "w", encoding="utf-8") as f: f.write(transcript_md) print_speed(result["segments"][-1]["end"]) print(f"... done !") def transcribe_file(model, audio_input, language): initial_prompt = ( "Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, " "das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. " "Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: " "Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, " "Apostelgeschichte, Auferstehung, Wiedergeburt. " "Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. " "Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. " "Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. " "Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden." ) result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language) return result def detect_language(model, audio): print(" Language detected: ", end='', flush=True) audio = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device) _, probs = model.detect_language(mel) lang_code = max(probs, key=probs.get) print(f"{lang_code}. ", end='', flush=True) return lang_code def process_file(file_path, model, audio_input): file_name = os.path.basename(file_path) # default values postfix = None language = detect_language(model, audio_input) if language == 'ru' and 'predigt' in file_name.lower() or language == 'de' and 'russisch' in file_name.lower(): # make two files # first file language="ru" postfix = "ru" print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True) markdown = transcribe_file(model, audio_input, language) write_markdown(file_path, markdown, postfix) # second file language="de" postfix = "de" elif language == 'en': # songs mostly detect as english language="de" elif language == 'de' or language == 'ru': # keep as detected pass else: # not german not english and not russian. --> russina language="ru" print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True) markdown = transcribe_file(model, audio_input, language) write_markdown(file_path, markdown, postfix) def process_folder(root_folder): """ Walk through root_folder and process .mp3 files, applying skip rules. Only files that need to be transcribed (i.e. transcription does not already exist) will have their audio pre-loaded concurrently. """ global start_time keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"] print("Create file list...") valid_files = [] checked_files = 0 # Walk the folder and build a list of files to transcribe. for dirpath, _, filenames in os.walk(root_folder): for filename in filenames: if filename.lower().endswith(".mp3"): checked_files = checked_files + 1 filename_lower = filename.lower() file_path = os.path.join(dirpath, filename) # Skip files with skip keywords. if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords): continue # Compute expected output markdown path. txt_folder = os.path.join(dirpath, "Transkription") base_name = os.path.splitext(os.path.basename(file_path))[0] output_md = os.path.join(txt_folder, base_name + ".md") output_md_de = os.path.join(txt_folder, base_name + "_de.md") output_md_ru = os.path.join(txt_folder, base_name + "_ru.md") # skip files with existing md files if os.path.exists(output_md) or os.path.exists(output_md_de) or os.path.exists(output_md_ru): continue valid_files.append(file_path) if len(valid_files) == 0: print(f"Checked {checked_files} files. All files are transcribed.") return else: print(f"Checked {checked_files} files. Start to transcribe {len(valid_files)} files.") print("Loading Whisper model...") model = whisper.load_model(model_name, device="cuda") # Use a thread pool to pre-load files concurrently. with concurrent.futures.ThreadPoolExecutor() as executor: # Pre-load the first file. print("Initialize preloading process...") future_audio = executor.submit(whisper.load_audio, valid_files[0]) # Wait for the first file to be loaded. preloaded_audio = future_audio.result() # Record start time for transcription statistics start_time = time.time() for i, file_path in enumerate(valid_files): preloaded_audio = future_audio.result() # Start loading the next file concurrently. if i + 1 < len(valid_files): future_audio = executor.submit(whisper.load_audio, valid_files[i + 1]) try: # continue with next file if a file fails process_file(file_path, model, preloaded_audio) except Exception as e: print(f"Error with file {file_path}") print(e) if __name__ == "__main__": for folder in folder_list: process_folder(folder) print("All done!")