首页 >后端开发 >Python教程 >自定义转录和剪辑管道

自定义转录和剪辑管道

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB原创
2024-07-17 12:40:201203浏览

Custom Transcription and Clipping Pipeline

我为什么这么做:

我正在研究这个项目,并开发了一堆工具来完成重型数据工程组件的发布,因为其中一些工具很巧妙,但大多数都是这样,以便它们被下一个 Gemini 模型取代并纳入到愚蠢的 Google Colab Gemini 建议引擎。 - 蒂姆

说明和解释

指示:
  1. 确保您已安装所需的依赖项(例如 ffmpeg、whisperx)。
  2. 将根目录设置为包含视频文件的工作目录。
  3. 定义您想要在成绩单中检测的阶段。
  4. 运行脚本以生成文字记录并根据检测到的阶段提取视频剪辑。
说明:
  • 该工具处理根目录中的视频文件。
  • 它使用 WhisperX 模型转录每个视频。
  • 然后脚本根据脚本中找到的阶段从视频中提取剪辑。
  • 脚本和剪辑保存在指定的输出目录中。

代码:

import os
import shutil
import cv2
import numpy as np
import json
from PIL import Image
import random
import string
from rembg import remove
import ffmpeg
from datetime import timedelta
from ultralytics import YOLO
import whisperx
import gc
gc.collect()

# Define paths to directories
root = '/

workspace/'
stages = ['apple', 'banana', 'car', 'dog']

transcript_dir = root + 'transcripts'
clip_output_dir = root + 'stage1'
stage1_clips_dir = clip_output_dir

# Ensure the output directory exists
os.makedirs(transcript_dir, exist_ok=True)
os.makedirs(clip_output_dir, exist_ok=True)

def log_and_print(message):
    print(message)

def convert_time_to_seconds(time_str):
    hours, minutes, seconds_milliseconds = time_str.split(':')
    seconds, milliseconds = seconds_milliseconds.split(',')
    total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000
    return total_seconds

def transcribe_video(video_path):
    """Transcribe the video using Whisper model and return the transcript."""
    compute_type = "float32"
    model = whisperx.load_model("large-v2", device='cpu', compute_type=compute_type)
    audio = whisperx.load_audio(video_path)
    result = model.transcribe(audio, batch_size=4, language="en")
    model_a, metadata = whisperx.load_align_model(language_code=result["language"], device='cpu')
    aligned_result = whisperx.align(result["segments"], model_a, metadata, audio, 'cpu', return_char_alignments=False)
    segments = aligned_result["segments"]
    transcript = []
    for index, segment in enumerate(segments):
        start_time = str(0) + str(timedelta(seconds=int(segment['start']))) + ',000'
        end_time = str(0) + str(timedelta(seconds=int(segment['end']))) + ',000'
        text = segment['text']
        segment_text = {
            "index": index + 1,
            "start_time": start_time,
            "end_time": end_time,
            "text": text.strip(),
        }
        transcript.append(segment_text)
    return transcript

def extract_clips(video_path, transcript, stages):
    """Extract clips from the video based on the transcript and stages."""
    base_filename = os.path.splitext(os.path.basename(video_path))[0]
    clip_index = 0
    current_stage = None
    start_time = None
    partial_transcript = []

    for segment in transcript:
        segment_text = segment["text"].lower()
        for stage in stages:
            if stage in segment_text:
                if current_stage is not None:
                    end_time = convert_time_to_seconds(segment["start_time"])
                    output_clip_filename = f"{base_filename}.{current_stage}.mp4"
                    output_clip = os.path.join(clip_output_dir, output_clip_filename)
                    if not os.path.exists(output_clip):
                        try:
                            ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                            log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
                        except ffmpeg.Error as e:
                            log_and_print(f"Error extracting clip: {e}")

                        transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
                        transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
                        with open(transcript_path, 'w', encoding='utf-8') as f:
                            json.dump(transcript_text, f, ensure_ascii=False, indent=4)
                        log_and_print(f"Saved partial transcript to {transcript_path}")

                        partial_transcript = []

                current_stage = stage
                start_time = convert_time_to_seconds(segment["start_time"])
            partial_transcript.append(segment)

    if current_stage is not None:
        end_time = convert_time_to_seconds(transcript[-1]["end_time"])
        output_clip_filename = f"{base_filename}.{current_stage}.mp4"
        output_clip = os.path.join(clip_output_dir, output_clip_filename)
        if not os.path.exists(output_clip):
            try:
                ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
            except ffmpeg.Error as e:
                log_and_print(f"Error extracting clip: {e}")

            transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
            transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript_text, f, ensure_ascii=False, indent=4)
            log_and_print(f"Saved partial transcript to {transcript_path}")

def process_transcripts(input_dir, transcript_dir, stages):
    """Process each video file to generate transcripts and extract clips."""
    video_files = [f for f in os.listdir(input_dir) if f.endswith('.mp4') or f.endswith('.MOV') or f.endswith('.mov')]

    for video_file in video_files:
        video_path = os.path.join(input_dir, video_file)
        transcript_path = os.path.join(transcript_dir, os.path.splitext(video_file)[0] + ".json")

        if not os.path.exists(transcript_path):
            transcript = transcribe_video(video_path)
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript, f, ensure_ascii=False, indent=4)
            log_and_print(f"Created transcript for {video_path}")
        else:
            with open(transcript_path, 'r', encoding='utf-8') as f:
                transcript = json.load(f)

        extract_clips(video_path, transcript, stages)

process_transcripts(root, transcript_dir, stages)

关键词和标签

  • 关键字:转录、视频处理、剪辑、WhisperX、自动化、舞台、视频剪辑
  • 标签:#TranscriptionTool #VideoProcessing #ClippingTool #WhisperX #VideoAutomation #StageDetection #VideoClips

-----------EOF------------

由来自加拿大中西部的 Tim 创建。
2024 年。
本文档已获得 GPL 许可。

以上是自定义转录和剪辑管道的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn