ホームページ  >  記事  >  バックエンド開発  >  カスタム文字起こしとクリッピング パイプライン

カスタム文字起こしとクリッピング パイプライン

WBOY
WBOYオリジナル
2024-07-17 12:40:201130ブラウズ

Custom Transcription and Clipping Pipeline

なぜそれをしたのか:

私はこのプロジェクトに取り組んでおり、過酷なデータ エンジニアリング コンポーネントの公開を処理するためのツールを多数開発しました。その一部は独創的なものですが、ほとんどは次の Gemini モデルに急襲され、愚かな Google Colab Gemini 提案エンジン。 - ティム

指示と説明

説明書:
  1. 必要な依存関係がインストールされていることを確認してください (ffmpeg、whisperx など)。
  2. ルート ディレクトリをビデオ ファイルを含む作業ディレクトリに設定します。
  3. トランスクリプト内で検出するステージを定義します。
  4. スクリプトを実行してトランスクリプトを生成し、検出されたステージに基づいてビデオ クリップを抽出します。
説明:
  • このツールはルート ディレクトリ内のビデオ ファイルを処理します。
  • WhisperX モデルを使用して各ビデオを文字起こしします。
  • スクリプトは、トランスクリプトで見つかったステージに基づいてビデオからクリップを抽出します。
  • トランスクリプトとクリップは、指定された出力ディレクトリに保存されます。

コード:

import os
import shutil
import cv2
import numpy as np
import json
from PIL import Image
import random
import string
from rembg import remove
import ffmpeg
from datetime import timedelta
from ultralytics import YOLO
import whisperx
import gc
gc.collect()

# Define paths to directories
root = '/

workspace/'
stages = ['apple', 'banana', 'car', 'dog']

transcript_dir = root + 'transcripts'
clip_output_dir = root + 'stage1'
stage1_clips_dir = clip_output_dir

# Ensure the output directory exists
os.makedirs(transcript_dir, exist_ok=True)
os.makedirs(clip_output_dir, exist_ok=True)

def log_and_print(message):
    print(message)

def convert_time_to_seconds(time_str):
    hours, minutes, seconds_milliseconds = time_str.split(':')
    seconds, milliseconds = seconds_milliseconds.split(',')
    total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000
    return total_seconds

def transcribe_video(video_path):
    """Transcribe the video using Whisper model and return the transcript."""
    compute_type = "float32"
    model = whisperx.load_model("large-v2", device='cpu', compute_type=compute_type)
    audio = whisperx.load_audio(video_path)
    result = model.transcribe(audio, batch_size=4, language="en")
    model_a, metadata = whisperx.load_align_model(language_code=result["language"], device='cpu')
    aligned_result = whisperx.align(result["segments"], model_a, metadata, audio, 'cpu', return_char_alignments=False)
    segments = aligned_result["segments"]
    transcript = []
    for index, segment in enumerate(segments):
        start_time = str(0) + str(timedelta(seconds=int(segment['start']))) + ',000'
        end_time = str(0) + str(timedelta(seconds=int(segment['end']))) + ',000'
        text = segment['text']
        segment_text = {
            "index": index + 1,
            "start_time": start_time,
            "end_time": end_time,
            "text": text.strip(),
        }
        transcript.append(segment_text)
    return transcript

def extract_clips(video_path, transcript, stages):
    """Extract clips from the video based on the transcript and stages."""
    base_filename = os.path.splitext(os.path.basename(video_path))[0]
    clip_index = 0
    current_stage = None
    start_time = None
    partial_transcript = []

    for segment in transcript:
        segment_text = segment["text"].lower()
        for stage in stages:
            if stage in segment_text:
                if current_stage is not None:
                    end_time = convert_time_to_seconds(segment["start_time"])
                    output_clip_filename = f"{base_filename}.{current_stage}.mp4"
                    output_clip = os.path.join(clip_output_dir, output_clip_filename)
                    if not os.path.exists(output_clip):
                        try:
                            ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                            log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
                        except ffmpeg.Error as e:
                            log_and_print(f"Error extracting clip: {e}")

                        transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
                        transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
                        with open(transcript_path, 'w', encoding='utf-8') as f:
                            json.dump(transcript_text, f, ensure_ascii=False, indent=4)
                        log_and_print(f"Saved partial transcript to {transcript_path}")

                        partial_transcript = []

                current_stage = stage
                start_time = convert_time_to_seconds(segment["start_time"])
            partial_transcript.append(segment)

    if current_stage is not None:
        end_time = convert_time_to_seconds(transcript[-1]["end_time"])
        output_clip_filename = f"{base_filename}.{current_stage}.mp4"
        output_clip = os.path.join(clip_output_dir, output_clip_filename)
        if not os.path.exists(output_clip):
            try:
                ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
            except ffmpeg.Error as e:
                log_and_print(f"Error extracting clip: {e}")

            transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
            transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript_text, f, ensure_ascii=False, indent=4)
            log_and_print(f"Saved partial transcript to {transcript_path}")

def process_transcripts(input_dir, transcript_dir, stages):
    """Process each video file to generate transcripts and extract clips."""
    video_files = [f for f in os.listdir(input_dir) if f.endswith('.mp4') or f.endswith('.MOV') or f.endswith('.mov')]

    for video_file in video_files:
        video_path = os.path.join(input_dir, video_file)
        transcript_path = os.path.join(transcript_dir, os.path.splitext(video_file)[0] + ".json")

        if not os.path.exists(transcript_path):
            transcript = transcribe_video(video_path)
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript, f, ensure_ascii=False, indent=4)
            log_and_print(f"Created transcript for {video_path}")
        else:
            with open(transcript_path, 'r', encoding='utf-8') as f:
                transcript = json.load(f)

        extract_clips(video_path, transcript, stages)

process_transcripts(root, transcript_dir, stages)

キーワードとハッシュタグ

  • キーワード: 文字起こし、ビデオ処理、クリッピング、WhisperX、自動化、ステージ、ビデオ クリップ
  • ハッシュタグ: #文字起こしツール #ビデオ処理 #クリッピングツール #WhisperX #ビデオオートメーション #ステージ検出 #ビデオクリップ

----------EOF----------

カナダ中西部出身のティムによって作成されました。
2024.
このドキュメントは GPL ライセンスを取得しています。

以上がカスタム文字起こしとクリッピング パイプラインの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。