303 lines
12 KiB
Python
Executable File
303 lines
12 KiB
Python
Executable File
import os
|
|
import sys
|
|
import time
|
|
import torch
|
|
import whisper
|
|
import concurrent.futures
|
|
import json
|
|
import re
|
|
import yaml
|
|
import librosa
|
|
import numpy as np
|
|
|
|
|
|
# start time for transcription statistics
|
|
start_time = 0
|
|
total_audio_length = 0
|
|
|
|
with open("transcription_config.yml", "r", encoding="utf-8") as file:
|
|
settings = yaml.safe_load(file)
|
|
folder_list = settings.get("folder_list")
|
|
model_name = settings.get("model_name")
|
|
gpu_only = settings.get("gpu_only", False)
|
|
|
|
print("PyTorch version:", torch.__version__)
|
|
print("CUDA available?", torch.cuda.is_available())
|
|
print("CUDA version:", torch.version.cuda)
|
|
print("GPU count:", torch.cuda.device_count())
|
|
if torch.cuda.is_available():
|
|
for i in range(torch.cuda.device_count()):
|
|
print(f" Device {i}:", torch.cuda.get_device_name(i))
|
|
|
|
if not folder_list or not model_name:
|
|
print("Error: Please check the transcription_config.yml file. It should contain 'folder_list' and 'model_name'.")
|
|
sys.exit(1)
|
|
|
|
if gpu_only and not torch.cuda.is_available():
|
|
print("Error: You requested to only use GPU but it is not available. Please check your PyTorch installation.")
|
|
sys.exit(1)
|
|
|
|
def load_audio_librosa(path: str, sr: int = 16_000) -> np.ndarray:
|
|
audio, orig_sr = librosa.load(path, sr=sr) # load + resample to 16 kHz
|
|
return audio
|
|
|
|
def format_timestamp(seconds):
|
|
"""Format seconds into HH:MM:SS."""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
if hours == 0:
|
|
return f"{minutes:02}:{secs:02}"
|
|
else:
|
|
return f"{hours:02}:{minutes:02}:{secs:02}"
|
|
|
|
def format_status_path(path):
|
|
"""Return a string with only the immediate parent folder and the filename."""
|
|
filename = os.path.basename(path)
|
|
parent = os.path.basename(os.path.dirname(path))
|
|
if parent:
|
|
return os.path.join(parent, filename)
|
|
return filename
|
|
|
|
def remove_lines_with_words(transcript):
|
|
"""Removes the last line from the transcript if any banned word is found in it."""
|
|
# Define banned words
|
|
banned_words = ["copyright", "ard", "zdf", "wdr"]
|
|
|
|
# Split transcript into lines
|
|
lines = transcript.rstrip().splitlines()
|
|
if not lines:
|
|
return transcript # Return unchanged if transcript is empty
|
|
|
|
# Check the last line
|
|
last_line = lines[-1]
|
|
if any(banned_word.lower() in last_line.lower() for banned_word in banned_words):
|
|
# Remove the last line if any banned word is present
|
|
lines = lines[:-1]
|
|
|
|
return "\n".join(lines)
|
|
|
|
def apply_error_correction(text):
|
|
# Load the JSON file that contains your error_correction
|
|
with open('error_correction.json', 'r', encoding='utf-8') as file:
|
|
correction_dict = json.load(file)
|
|
|
|
# Combine keys into a single regex pattern
|
|
pattern = r'\b(' + '|'.join(re.escape(key) for key in correction_dict.keys()) + r')\b'
|
|
|
|
def replacement_func(match):
|
|
key = match.group(0)
|
|
return correction_dict.get(key, key)
|
|
|
|
return re.sub(pattern, replacement_func, text)
|
|
|
|
def print_speed(current_length):
|
|
global start_time
|
|
global total_audio_length
|
|
# Calculate transcription time statistics
|
|
elapsed_time = time.time() - start_time
|
|
|
|
total_audio_length = total_audio_length + current_length
|
|
|
|
# Calculate transcription speed: minutes of audio transcribed per hour of processing.
|
|
# Formula: (audio duration in minutes) / (elapsed time in hours)
|
|
if elapsed_time > 0:
|
|
trans_speed = (total_audio_length / 60) / (elapsed_time / 3600)
|
|
else:
|
|
trans_speed = 0
|
|
|
|
print(f" | Speed: {int(trans_speed)} minutes per hour | ", end='', flush=True)
|
|
|
|
|
|
def transcribe_file(model, audio_input, language):
|
|
initial_prompt = (
|
|
"Dieses Audio ist eine Aufnahme eines christlichen Gottesdienstes, "
|
|
"das biblische Zitate, religiöse Begriffe und typische Gottesdienst-Phrasen enthält. "
|
|
"Achte darauf auf folgende Begriffe, die häufig falsch transkribiert wurden, korrekt wiederzugeben: "
|
|
"Stiftshütte, Bundeslade, Heiligtum, Offenbarung, Evangelium, Buße, Golgatha, "
|
|
"Apostelgeschichte, Auferstehung, Wiedergeburt. "
|
|
"Das Wort 'Bethaus' wird häufig als synonym für 'Gebetshaus' verwendet. "
|
|
"Das Wort 'Abendmahl' ist wichtig und sollte zuverlässig erkannt werden. "
|
|
"Ebenso müssen biblische Namen und Persönlichkeiten exakt transkribiert werden. "
|
|
"Zahlenangaben, beispielsweise Psalmnummern oder Bibelverse, sollen numerisch dargestellt werden."
|
|
)
|
|
result = model.transcribe(audio_input, initial_prompt=initial_prompt, language=language)
|
|
return result
|
|
|
|
def detect_language(model, audio):
|
|
print(" Language detected: ", end='', flush=True)
|
|
audio = whisper.pad_or_trim(audio)
|
|
mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
|
|
_, probs = model.detect_language(mel)
|
|
lang_code = max(probs, key=probs.get)
|
|
print(f"{lang_code}. ", end='', flush=True)
|
|
return lang_code
|
|
|
|
|
|
def process_file(file_path, model, audio_input):
|
|
"""
|
|
Transcribe the audio file into one markdown file.
|
|
If special case (German sermon in Russian or Russian-marked file), transcribe both in Russian and German into the same file.
|
|
"""
|
|
file_name = os.path.basename(file_path)
|
|
|
|
# Detect spoken language
|
|
detected = detect_language(model, audio_input)
|
|
|
|
# Determine which languages to transcribe
|
|
if (detected == 'ru' and 'predigt' in file_name.lower()) or \
|
|
(detected == 'de' and 'russisch' in file_name.lower()):
|
|
langs = ['de', 'ru']
|
|
elif detected == 'en': # songs often mis-detected as English
|
|
langs = ['de']
|
|
elif detected in ('de', 'ru'):
|
|
langs = [detected]
|
|
else:
|
|
langs = ['ru']
|
|
|
|
# Collect segments for combined result
|
|
lang_collection = {}
|
|
for lang in langs:
|
|
combined_segments = []
|
|
print(f"Transcribing {format_status_path(file_path)} as {lang}", end='', flush=True)
|
|
result = transcribe_file(model, audio_input, lang)
|
|
|
|
# Extend with actual segments
|
|
if isinstance(result, dict) and 'segments' in result:
|
|
combined_segments.extend(result['segments'])
|
|
else:
|
|
# If result isn't dict-of-segments, wrap entire text
|
|
text = getattr(result, 'text', None) or (result.get('text') if isinstance(result, dict) else str(result))
|
|
combined_segments.append({'start': 0, 'text': text})
|
|
lang_collection[lang] = combined_segments
|
|
|
|
# Now write out markdown using the combined segments
|
|
file_dir = os.path.dirname(file_path)
|
|
txt_folder = os.path.join(file_dir, "Transkription")
|
|
os.makedirs(txt_folder, exist_ok=True)
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
output_md = os.path.join(txt_folder, base_name + ".md")
|
|
|
|
# Build markdown lines
|
|
folder_name = os.path.basename(file_dir)
|
|
md_lines = [
|
|
f"### {folder_name}",
|
|
f"#### {os.path.basename(file_path)}",
|
|
"---",
|
|
""
|
|
]
|
|
previous_text = ""
|
|
for lang, combined_segments in lang_collection.items():
|
|
md_lines.append(f"##### Transcription ({lang.upper()})")
|
|
md_lines.append("---")
|
|
for segment in combined_segments:
|
|
start = format_timestamp(segment.get('start', 0))
|
|
text = segment.get('text', '').strip()
|
|
if text and text != previous_text:
|
|
md_lines.append(f"`{start}` {text}")
|
|
previous_text = text
|
|
|
|
# Join and post-process
|
|
transcript_md = "\n".join(md_lines)
|
|
transcript_md = apply_error_correction(transcript_md)
|
|
transcript_md = remove_lines_with_words(transcript_md)
|
|
|
|
# Write file and report
|
|
with open(output_md, "w", encoding="utf-8") as f:
|
|
f.write(transcript_md)
|
|
|
|
if combined_segments:
|
|
end_ts = combined_segments[-1].get('end', combined_segments[-1].get('start', 0))
|
|
print_speed(end_ts)
|
|
print("... done !")
|
|
|
|
|
|
def process_folder(root_folder):
|
|
"""
|
|
Walk through root_folder and process .mp3 files.
|
|
Differentiates between “folder not found” and “folder empty.”
|
|
Select files that need to be transcribed (i.e. transcription does not already exist, applying certain rules)
|
|
"""
|
|
global start_time
|
|
keywords = ["musik", "chor", "lied", "gesang", "orchester", "orhester", "melodi", "sot"]
|
|
print("Create file list...")
|
|
|
|
# path actually exist / can we even try to list it?
|
|
if not os.path.exists(root_folder):
|
|
print(f"Error: Path '{root_folder}' does not exist or is not reachable.")
|
|
return
|
|
|
|
if not os.path.isdir(root_folder):
|
|
print(f"Error: Path '{root_folder}' exists but is not a folder.")
|
|
return
|
|
|
|
# Now we know the folder exists; let's scan it.
|
|
print(f"Scanning '{root_folder}' for .mp3 files…")
|
|
valid_files = []
|
|
checked_files = 0
|
|
|
|
for dirpath, _, filenames in os.walk(root_folder):
|
|
for filename in filenames:
|
|
if filename.lower().endswith(".mp3"):
|
|
checked_files += 1
|
|
filename_lower = filename.lower()
|
|
file_path = os.path.join(dirpath, filename)
|
|
# Skip files with skip keywords.
|
|
if "vorwort" not in filename_lower and any(keyword in filename_lower for keyword in keywords):
|
|
continue
|
|
|
|
# Compute expected output markdown path.
|
|
txt_folder = os.path.join(dirpath, "Transkription")
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
output_md = os.path.join(txt_folder, base_name + ".md")
|
|
# skip files with existing md files
|
|
if os.path.exists(output_md):
|
|
continue
|
|
|
|
valid_files.append(file_path)
|
|
|
|
# If the folder was empty of .mp3s, checked_files will be 0,
|
|
# but we know it existed because we passed the exists()/isdir() tests.
|
|
if checked_files == 0:
|
|
print(f"Checked 0 files in '{root_folder}'. Folder is empty of .mp3s.")
|
|
return
|
|
|
|
# If you made it here, checked_files > 0 but maybe all were already transcribed:
|
|
if len(valid_files) == 0:
|
|
print(f"Checked {checked_files} files. All files are already transcribed.")
|
|
return
|
|
|
|
# Otherwise you have files to process…
|
|
print(f"Checked {checked_files} files. {len(valid_files)} need transcription.")
|
|
|
|
# Choose “cuda” if available, otherwise “cpu”
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
print(f"Loading Whisper model on {device}…")
|
|
|
|
model = whisper.load_model(model_name, device=device)
|
|
|
|
# Use a thread pool to pre-load files concurrently.
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
# Pre-load the first file.
|
|
print("Initialize preloading process...")
|
|
future_audio = executor.submit(load_audio_librosa, valid_files[0])
|
|
# Wait for the first file to be loaded.
|
|
preloaded_audio = future_audio.result()
|
|
# Record start time for transcription statistics
|
|
start_time = time.time()
|
|
|
|
for i, file_path in enumerate(valid_files):
|
|
preloaded_audio = future_audio.result()
|
|
# Start loading the next file concurrently.
|
|
if i + 1 < len(valid_files):
|
|
future_audio = executor.submit(load_audio_librosa, valid_files[i + 1])
|
|
try: # continue with next file if a file fails
|
|
process_file(file_path, model, preloaded_audio)
|
|
except Exception as e:
|
|
print(f"Error with file {file_path}")
|
|
print(e)
|
|
|
|
if __name__ == "__main__":
|
|
for folder in folder_list:
|
|
process_folder(folder)
|
|
print("All done!") |