bethaus-app/transcribe_all.py
2025-06-01 09:35:42 +00:00

255 lines
9.7 KiB
Python
Executable File

import os
import sys
import time
import torch
import whisper
import concurrent.futures
import json
import re
import yaml
import librosa
import numpy as np
# start time for transcription statistics
start_time = 0
total_audio_length = 0
with open("transcription_config.yml", "r", encoding="utf-8") as file:
settings = yaml.safe_load(file)
folder_list = settings.get("folder_list")
model_name = settings.get("model_name")
def load_audio_librosa(path: str, sr: int = 16_000) -> np.ndarray:
audio, orig_sr = librosa.load(path, sr=sr) # load + resample to 16 kHz
return audio
def format_timestamp(seconds):
"""Format seconds into HH:MM:SS."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours == 0:
return f"{minutes:02}:{secs:02}"
else:
return f"{hours:02}:{minutes:02}:{secs:02}"
def format_status_path(path):
"""Return a string with only the immediate parent folder and the filename."""
filename = os.path.basename(path)
parent = os.path.basename(os.path.dirname(path))
if parent:
return os.path.join(parent, filename)
return filename
def remove_lines_with_words(transcript):
"""Removes the last line from the transcript if any banned word is found in it."""
# Define banned words
banned_words = ["copyright", "ard", "zdf", "wdr"]
# Split transcript into lines
lines = transcript.rstrip().splitlines()
if not lines:
return transcript # Return unchanged if transcript is empty
# Check the last line
last_line = lines[-1]
if any(banned_word.lower() in last_line.lower() for banned_word in banned_words):
# Remove the last line if any banned word is present
lines = lines[:-1]
return "\n".join(lines)
def apply_error_correction(text):
# Load the JSON file that contains your error_correction
with open('error_correction.json', 'r', encoding='utf-8') as file:
correction_dict = json.load(file)
# Combine keys into a single regex pattern
pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b'
def replacement_func(match):
key = match.group(0)
return correction_dict.get(key, key)
return re.sub(pattern, replacement_func, text)
def print_speed(current_length):
global start_time
global total_audio_length
# Calculate transcription time statistics
elapsed_time = time.time() - start_time
total_audio_length = total_audio_length + current_length
# Calculate transcription speed: minutes of audio transcribed per hour of processing.
# Formula: (audio duration in minutes) / (elapsed time in hours)
if elapsed_time > 0:
trans_speed = (total_audio_length / 60) / (elapsed_time / 3600)
else:
trans_speed = 0
print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True)
def write_markdown(file_path, result, postfix=None):
file_dir = os.path.dirname(file_path)
txt_folder = os.path.join(file_dir, "Transkription")
os.makedirs(txt_folder, exist_ok=True)
base_name = os.path.splitext(os.path.basename(file_path))[0]
if postfix != None:
base_name = f"{base_name}_{postfix}"
output_md = os.path.join(txt_folder, base_name + ".md")
# Prepare the markdown content.
folder_name = os.path.basename(file_dir)
md_lines = [
f"### {folder_name}",
f"#### {os.path.basename(file_path)}",
"---",
""
]
previous_text = ""
for segment in result["segments"]:
start = format_timestamp(segment["start"])
text = segment["text"].strip()
if previous_text != text: # suppress repeating lines
md_lines.append(f"`{start}` {text}")
previous_text = text
transcript_md = "\n".join(md_lines)
transcript_md = apply_error_correction(transcript_md)
transcript_md = remove_lines_with_words(transcript_md)
with open(output_md, "w", encoding="utf-8") as f:
f.write(transcript_md)
print_speed(result["segments"][-1]["end"])
print(f"... done !")
def transcribe_file(model, audio_input, language):
initial_prompt = (
"Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, "
"das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. "
"Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: "
"Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, "
"Apostelgeschichte, Auferstehung, Wiedergeburt. "
"Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. "
"Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. "
"Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. "
"Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden."
)
result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language)
return result
def detect_language(model, audio):
print(" Language detected: ", end='', flush=True)
audio = whisper.pad_or_trim(audio)
mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
_, probs = model.detect_language(mel)
lang_code = max(probs, key=probs.get)
print(f"{lang_code}. ", end='', flush=True)
return lang_code
def process_file(file_path, model, audio_input):
file_name = os.path.basename(file_path)
# default values
postfix = None
language = detect_language(model, audio_input)
if language == 'ru' and 'predigt' in file_name.lower() or language == 'de' and 'russisch' in file_name.lower(): # make two files
# first file
language="ru"
postfix = "ru"
print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True)
markdown = transcribe_file(model, audio_input, language)
write_markdown(file_path, markdown, postfix)
# second file
language="de"
postfix = "de"
elif language == 'en': # songs mostly detect as english
language="de"
elif language == 'de' or language == 'ru': # keep as detected
pass
else: # not german not english and not russian. --> russina
language="ru"
print(f"Transcribing {format_status_path(file_path)} ", end='', flush=True)
markdown = transcribe_file(model, audio_input, language)
write_markdown(file_path, markdown, postfix)
def process_folder(root_folder):
"""
Walk through root_folder and process .mp3 files, applying skip rules.
Only files that need to be transcribed (i.e. transcription does not already exist)
will have their audio pre-loaded concurrently.
"""
global start_time
keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"]
print("Create file list...")
valid_files = []
checked_files = 0
# Walk the folder and build a list of files to transcribe.
for dirpath, _, filenames in os.walk(root_folder):
for filename in filenames:
if filename.lower().endswith(".mp3"):
checked_files = checked_files + 1
filename_lower = filename.lower()
file_path = os.path.join(dirpath, filename)
# Skip files with skip keywords.
if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords):
continue
# Compute expected output markdown path.
txt_folder = os.path.join(dirpath, "Transkription")
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_md = os.path.join(txt_folder, base_name + ".md")
output_md_de = os.path.join(txt_folder, base_name + "_de.md")
output_md_ru = os.path.join(txt_folder, base_name + "_ru.md")
# skip files with existing md files
if os.path.exists(output_md) or os.path.exists(output_md_de) or os.path.exists(output_md_ru):
continue
valid_files.append(file_path)
if len(valid_files) == 0:
print(f"Checked {checked_files} files. All files are transcribed.")
return
else:
print(f"Checked {checked_files} files. Start to transcribe {len(valid_files)} files.")
# Choose “cuda” if available, otherwise “cpu”
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading Whisper model on {device}")
model = whisper.load_model(model_name, device=device)
# Use a thread pool to pre-load files concurrently.
with concurrent.futures.ThreadPoolExecutor() as executor:
# Pre-load the first file.
print("Initialize preloading process...")
future_audio = executor.submit(load_audio_librosa, valid_files[0])
# Wait for the first file to be loaded.
preloaded_audio = future_audio.result()
# Record start time for transcription statistics
start_time = time.time()
for i, file_path in enumerate(valid_files):
preloaded_audio = future_audio.result()
# Start loading the next file concurrently.
if i + 1 < len(valid_files):
future_audio = executor.submit(load_audio_librosa, valid_files[i + 1])
try: # continue with next file if a file fails
process_file(file_path, model, preloaded_audio)
except Exception as e:
print(f"Error with file {file_path}")
print(e)
if __name__ == "__main__":
for folder in folder_list:
process_folder(folder)
print("All done!")