bethaus-app/transcribe_all.py
2025-04-05 08:51:39 +02:00

253 lines
9.7 KiB
Python
Executable File

import os
import sys
import time
import whisper
import concurrent.futures
import json
import re
# model_name = "large-v3"
model_name = "medium"
# start time for transcription statistics
start_time = 0
total_audio_length = 0
folder_list = [
# Speyer
# "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2025",
# "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2016",
# "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2015",
# "\\\\10.1.0.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2014",
# Schwegenheim
"\\\\10.1.1.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2025",
"\\\\10.1.1.11\\Aufnahme-stereo\\010 Gottesdienste ARCHIV\\2024"
]
def format_timestamp(seconds):
"""Format seconds into HH:MM:SS."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours == 0:
return f"{minutes:02}:{secs:02}"
else:
return f"{hours:02}:{minutes:02}:{secs:02}"
def format_status_path(path):
"""Return a string with only the immediate parent folder and the filename."""
filename = os.path.basename(path)
parent = os.path.basename(os.path.dirname(path))
if parent:
return os.path.join(parent, filename)
return filename
def remove_lines_with_words(transcript):
"""Removes the last line from the transcript if any banned word is found in it."""
# Define banned words
banned_words = ["copyright", "ard", "zdf", "wdr"]
# Split transcript into lines
lines = transcript.rstrip().splitlines()
if not lines:
return transcript # Return unchanged if transcript is empty
# Check the last line
last_line = lines[-1]
if any(banned_word.lower() in last_line.lower() for banned_word in banned_words):
# Remove the last line if any banned word is present
lines = lines[:-1]
return "\n".join(lines)
def apply_error_correction(text):
# Load the JSON file that contains your error_correction
with open('error_correction.json', 'r', encoding='utf-8') as file:
correction_dict = json.load(file)
# Combine keys into a single regex pattern
pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b'
def replacement_func(match):
key = match.group(0)
return correction_dict.get(key, key)
return re.sub(pattern, replacement_func, text)
def print_speed(current_length):
global start_time
global total_audio_length
# Calculate transcription time statistics
elapsed_time = time.time() - start_time
total_audio_length = total_audio_length + current_length
# Calculate transcription speed: minutes of audio transcribed per hour of processing.
# Formula: (audio duration in minutes) / (elapsed time in hours)
if elapsed_time > 0:
trans_speed = (total_audio_length / 60) / (elapsed_time / 3600)
else:
trans_speed = 0
print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True)
def write_markdown(file_path, result, postfix=None):
file_dir = os.path.dirname(file_path)
txt_folder = os.path.join(file_dir, "Transkription")
os.makedirs(txt_folder, exist_ok=True)
base_name = os.path.splitext(os.path.basename(file_path))[0]
if postfix != None:
base_name = f"{base_name}_{postfix}"
output_md = os.path.join(txt_folder, base_name + ".md")
# Prepare the markdown content.
folder_name = os.path.basename(file_dir)
md_lines = [
f"### {folder_name}",
f"#### {os.path.basename(file_path)}",
"---",
""
]
previous_text = ""
for segment in result["segments"]:
start = format_timestamp(segment["start"])
text = segment["text"].strip()
if previous_text != text: # suppress repeating lines
md_lines.append(f"`{start}` {text}")
previous_text = text
transcript_md = "\n".join(md_lines)
transcript_md = apply_error_correction(transcript_md)
transcript_md = remove_lines_with_words(transcript_md)
with open(output_md, "w", encoding="utf-8") as f:
f.write(transcript_md)
print_speed(result["segments"][-1]["end"])
print(f"... done !")
def transcribe_file(model, audio_input, language):
initial_prompt = (
"Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, "
"das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. "
"Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: "
"Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, "
"Apostelgeschichte, Auferstehung, Wiedergeburt. "
"Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. "
"Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. "
"Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. "
"Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden."
)
result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language)
return result
def detect_language(model, audio):
print(" Language detected: ", end='', flush=True)
audio = whisper.pad_or_trim(audio)
mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
_, probs = model.detect_language(mel)
lang_code = max(probs, key=probs.get)
print(f"{lang_code}. ", end='', flush=True)
return lang_code
def process_file(file_path, model, audio_input):
file_name = os.path.basename(file_path)
# default values
postfix = None
language = detect_language(model, audio_input)
if language == 'ru' and 'predigt' in file_name.lower() or language == 'de' and 'russisch' in file_name.lower(): # make two files
# first file
language="ru"
postfix = "ru"
print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True)
markdown = transcribe_file(model, audio_input, language)
write_markdown(file_path, markdown, postfix)
# second file
language="de"
postfix = "de"
elif language == 'en': # songs mostly detect as english
language="de"
elif language == 'de' or language == 'ru': # keep as detected
pass
else: # not german not english and not russian. --> russina
language="ru"
print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True)
markdown = transcribe_file(model, audio_input, language)
write_markdown(file_path, markdown, postfix)
def process_folder(root_folder):
"""
Walk through root_folder and process .mp3 files, applying skip rules.
Only files that need to be transcribed (i.e. transcription does not already exist)
will have their audio pre-loaded concurrently.
"""
global start_time
keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"]
print("Create file list...")
valid_files = []
checked_files = 0
# Walk the folder and build a list of files to transcribe.
for dirpath, _, filenames in os.walk(root_folder):
for filename in filenames:
if filename.lower().endswith(".mp3"):
checked_files = checked_files + 1
filename_lower = filename.lower()
file_path = os.path.join(dirpath, filename)
# Skip files with skip keywords.
if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords):
continue
# Compute expected output markdown path.
txt_folder = os.path.join(dirpath, "Transkription")
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_md = os.path.join(txt_folder, base_name + ".md")
output_md_de = os.path.join(txt_folder, base_name + "_de.md")
output_md_ru = os.path.join(txt_folder, base_name + "_ru.md")
# skip files with existing md files
if os.path.exists(output_md) or os.path.exists(output_md_de) or os.path.exists(output_md_ru):
continue
valid_files.append(file_path)
if len(valid_files) == 0:
print(f"Checked {checked_files} files. All files are transcribed.")
return
else:
print(f"Checked {checked_files} files. Start to transcribe {len(valid_files)} files.")
print("Loading Whisper model...")
model = whisper.load_model(model_name, device="cuda")
# Use a thread pool to pre-load files concurrently.
with concurrent.futures.ThreadPoolExecutor() as executor:
# Pre-load the first file.
print("Initialize preloading process...")
future_audio = executor.submit(whisper.load_audio, valid_files[0])
# Wait for the first file to be loaded.
preloaded_audio = future_audio.result()
# Record start time for transcription statistics
start_time = time.time()
for i, file_path in enumerate(valid_files):
preloaded_audio = future_audio.result()
# Start loading the next file concurrently.
if i + 1 < len(valid_files):
future_audio = executor.submit(whisper.load_audio, valid_files[i + 1])
try: # continue with next file if a file fails
process_file(file_path, model, preloaded_audio)
except Exception as e:
print(f"Error with file {file_path}")
print(e)
if __name__ == "__main__":
for folder in folder_list:
process_folder(folder)
print("All done!")