bethaus-app/transcribe_all.py

285 lines
11 KiB
Python
Executable File

import os
import sys
import time
import torch
import whisper
import concurrent.futures
import json
import re
import yaml
import librosa
import numpy as np
# start time for transcription statistics
start_time = 0
total_audio_length = 0
with open("transcription_config.yml", "r", encoding="utf-8") as file:
settings = yaml.safe_load(file)
folder_list = settings.get("folder_list")
model_name = settings.get("model_name")
gpu_only = settings.get("gpu_only", False)
print("PyTorch version:", torch.__version__)
print("CUDA available?", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("GPU count:", torch.cuda.device_count())
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
print(f" Device {i}:", torch.cuda.get_device_name(i))
if not folder_list or not model_name:
print("Error: Please check the transcription_config.yml file. It should contain 'folder_list' and 'model_name'.")
sys.exit(1)
if gpu_only and not torch.cuda.is_available():
print("Error: You requested to only use GPU but it is not available. Please check your PyTorch installation.")
sys.exit(1)
def load_audio_librosa(path: str, sr: int = 16_000) -> np.ndarray:
audio, orig_sr = librosa.load(path, sr=sr) # load + resample to 16 kHz
return audio
def format_timestamp(seconds):
"""Format seconds into HH:MM:SS."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours == 0:
return f"{minutes:02}:{secs:02}"
else:
return f"{hours:02}:{minutes:02}:{secs:02}"
def format_status_path(path):
"""Return a string with only the immediate parent folder and the filename."""
filename = os.path.basename(path)
parent = os.path.basename(os.path.dirname(path))
if parent:
return os.path.join(parent, filename)
return filename
def remove_lines_with_words(transcript):
"""Removes the last line from the transcript if any banned word is found in it."""
# Define banned words
banned_words = ["copyright", "ard", "zdf", "wdr"]
# Split transcript into lines
lines = transcript.rstrip().splitlines()
if not lines:
return transcript # Return unchanged if transcript is empty
# Check the last line
last_line = lines[-1]
if any(banned_word.lower() in last_line.lower() for banned_word in banned_words):
# Remove the last line if any banned word is present
lines = lines[:-1]
return "\n".join(lines)
def apply_error_correction(text):
# Load the JSON file that contains your error_correction
with open('error_correction.json', 'r', encoding='utf-8') as file:
correction_dict = json.load(file)
# Combine keys into a single regex pattern
pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b'
def replacement_func(match):
key = match.group(0)
return correction_dict.get(key, key)
return re.sub(pattern, replacement_func, text)
def print_speed(current_length):
global start_time
global total_audio_length
# Calculate transcription time statistics
elapsed_time = time.time() - start_time
total_audio_length = total_audio_length + current_length
# Calculate transcription speed: minutes of audio transcribed per hour of processing.
# Formula: (audio duration in minutes) / (elapsed time in hours)
if elapsed_time > 0:
trans_speed = (total_audio_length / 60) / (elapsed_time / 3600)
else:
trans_speed = 0
print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True)
def transcribe_file(model, audio_input, language):
initial_prompt = (
"Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, "
"das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. "
"Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: "
"Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, "
"Apostelgeschichte, Auferstehung, Wiedergeburt. "
"Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. "
"Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. "
"Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. "
"Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden."
)
result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language)
return result
def detect_language(model, audio):
print(" Language detected: ", end='', flush=True)
audio = whisper.pad_or_trim(audio)
mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
_, probs = model.detect_language(mel)
lang_code = max(probs, key=probs.get)
print(f"{lang_code}. ", end='', flush=True)
return lang_code
def process_file(file_path, model, audio_input):
"""
Transcribe the audio file into one markdown file.
If special case (German sermon in Russian or Russian-marked file), transcribe both in Russian and German into the same file.
"""
file_name = os.path.basename(file_path)
# Detect spoken language
detected = detect_language(model, audio_input)
# Determine which languages to transcribe
if (detected == 'ru' and 'predigt' in file_name.lower()) or \
(detected == 'de' and 'russisch' in file_name.lower()):
langs = ['ru', 'de']
elif detected == 'en': # songs often mis-detected as English
langs = ['de']
elif detected in ('de', 'ru'):
langs = [detected]
else:
langs = ['ru']
# Collect segments for combined result
combined_segments = []
for lang in langs:
print(f"Transcribing {format_status_path(file_path)} as {lang}", end='', flush=True)
result = transcribe_file(model, audio_input, lang)
# Insert a synthetic segment for language header
combined_segments.append({
'start': 0,
'text': f"\n## Transcription ({lang.upper()})",
})
# Extend with actual segments
if isinstance(result, dict) and 'segments' in result:
combined_segments.extend(result['segments'])
else:
# If result isn't dict-of-segments, wrap entire text
text = getattr(result, 'text', None) or (result.get('text') if isinstance(result, dict) else str(result))
combined_segments.append({'start': 0, 'text': text})
# Now write out markdown using the combined segments
file_dir = os.path.dirname(file_path)
txt_folder = os.path.join(file_dir, "Transkription")
os.makedirs(txt_folder, exist_ok=True)
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_md = os.path.join(txt_folder, base_name + ".md")
# Build markdown lines
folder_name = os.path.basename(file_dir)
md_lines = [
f"### {folder_name}",
f"#### {os.path.basename(file_path)}",
"---",
""
]
previous_text = ""
for segment in combined_segments:
start = format_timestamp(segment.get('start', 0))
text = segment.get('text', '').strip()
if text and text != previous_text:
md_lines.append(f"`{start}` {text}")
previous_text = text
# Join and post-process
transcript_md = "\n".join(md_lines)
transcript_md = apply_error_correction(transcript_md)
transcript_md = remove_lines_with_words(transcript_md)
# Write file and report
with open(output_md, "w", encoding="utf-8") as f:
f.write(transcript_md)
if combined_segments:
end_ts = combined_segments[-1].get('end', combined_segments[-1].get('start', 0))
print_speed(end_ts)
print("... done !")
def process_folder(root_folder):
"""
Walk through root_folder and process .mp3 files, applying skip rules.
Only files that need to be transcribed (i.e. transcription does not already exist)
will have their audio pre-loaded concurrently.
"""
global start_time
keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"]
print("Create file list...")
valid_files = []
checked_files = 0
# Walk the folder and build a list of files to transcribe.
for dirpath, _, filenames in os.walk(root_folder):
for filename in filenames:
if filename.lower().endswith(".mp3"):
checked_files = checked_files + 1
filename_lower = filename.lower()
file_path = os.path.join(dirpath, filename)
# Skip files with skip keywords.
if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords):
continue
# Compute expected output markdown path.
txt_folder = os.path.join(dirpath, "Transkription")
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_md = os.path.join(txt_folder, base_name + ".md")
# skip files with existing md files
if os.path.exists(output_md):
continue
valid_files.append(file_path)
if len(valid_files) == 0:
print(f"Checked {checked_files} files. All files are transcribed.")
return
else:
print(f"Checked {checked_files} files. Start to transcribe {len(valid_files)} files.")
# Choose “cuda” if available, otherwise “cpu”
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading Whisper model on {device}")
model = whisper.load_model(model_name, device=device)
# Use a thread pool to pre-load files concurrently.
with concurrent.futures.ThreadPoolExecutor() as executor:
# Pre-load the first file.
print("Initialize preloading process...")
future_audio = executor.submit(load_audio_librosa, valid_files[0])
# Wait for the first file to be loaded.
preloaded_audio = future_audio.result()
# Record start time for transcription statistics
start_time = time.time()
for i, file_path in enumerate(valid_files):
preloaded_audio = future_audio.result()
# Start loading the next file concurrently.
if i + 1 < len(valid_files):
future_audio = executor.submit(load_audio_librosa, valid_files[i + 1])
try: # continue with next file if a file fails
process_file(file_path, model, preloaded_audio)
except Exception as e:
print(f"Error with file {file_path}")
print(e)
if __name__ == "__main__":
for folder in folder_list:
process_folder(folder)
print("All done!")