Heim  >  Artikel  >  Backend-Entwicklung  >  So implementieren Sie mit Python ein Video-Deduplizierungs-Gadget

So implementieren Sie mit Python ein Video-Deduplizierungs-Gadget

WBOY
WBOYnach vorne
2023-04-19 14:37:141718Durchsuche

Erstellen Sie ein neues dup_video

import json
import os
import shutil

import cv2
import imagehash
from PIL import Image
from loguru import logger
from PySimpleGUI import popup_get_folder


class VideoDuplicate(object):
    '''
    返回整个视频的图片指纹列表
    从1秒开始,每3秒抽帧,计算一张图像指纹
    '''

    def __init__(self):
        self._over_length_video: list = []
        self._no_video: list = []

    def _video_hash(self, video_path) -> list:
        '''
        @param video_path -> 视频绝对路径;
        '''
        hash_arr = []
        cap = cv2.VideoCapture(video_path)  ##打开视频文件
        logger.info(f'开始抽帧【{video_path}】')

        n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 视频的帧数
        logger.warning(f'视频帧数:{n_frames}')

        fps = cap.get(cv2.CAP_PROP_FPS)  # 视频的帧率
        logger.warning(f'视频帧率:{fps}')

        dur = n_frames / fps * 1000  # 视频大致总长度
        cap_set = 1000
        logger.warning(f'视频大约总长:{dur / 1000}')
        if dur // 1000 > 11:
            logger.error(f'视频时长超出规定范围【6~10】;当前时长:【{dur // 1000}】;跳过该视频;')
            self._over_length_video.append(video_path)
            return []

        while cap_set < dur:  # 从3秒开始,每60秒抽帧,计算图像指纹。总长度-3s,是因为有的时候计算出来的长度不准。
            cap.set(cv2.CAP_PROP_POS_MSEC, cap_set)
            logger.debug(f&#39;开始提取:【{cap_set // 1000}】/s的图片;&#39;)
            # 返回该时间点的,图像(numpy数组),及读取是否成功
            success, image_np = cap.read()
            if success:
                img = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))  # 转成cv图像格式
                h = str(imagehash.dhash(img))
                logger.success(f&#39;【{cap_set}/s图像指纹:【{h}】&#39;)
                hash_arr.append(h)  # 图像指纹
            else:
                logger.error(str(cap_set / 1000))
            cap_set += 1000 * 2
        cap.release()  # 释放视频
        return hash_arr

    def start(self, base_dir):
        &#39;&#39;&#39;
        @param base_dir -> 主文件路径;
        &#39;&#39;&#39;
        data: list = []
        for video in os.listdir(base_dir):
            logger.debug(f&#39;-&#39; * 80)
            name, ext = os.path.splitext(video)
            if ext not in (&#39;.mp4&#39;, &#39;.MP4&#39;):
                logger.error(f&#39;视频文件格式不符;【{video}】;执行跳过;&#39;)
                continue

            abs_video_path = os.path.join(base_dir, video)
            video_hash_list = self._video_hash(abs_video_path)
            if video_hash_list:
                data.append({&#39;video_abs_path&#39;: abs_video_path, &#39;hash&#39;: video_hash_list})

        self._write_log(data)
        return data

    @staticmethod
    def _write_log(data: list) -> None:
        &#39;&#39;&#39;视频哈希后的值写入日志文件&#39;&#39;&#39;
        with open(f&#39;log.txt&#39;, &#39;w+&#39;, encoding=&#39;utf-8&#39;) as f:
            f.write(json.dumps(data))

    def __call__(self, base_dir, *args, **kwargs):
        self.start(base_dir)
        logger.debug(f&#39;-----------------------------------开始比对关键帧差值感知余弦算法-----------------------------&#39;)

        with open(&#39;log.txt&#39;) as f:
            data = json.loads(f.read())
            for i in range(0, len(data) - 1):
                for j in range(i + 1, len(data)):
                    if data[i][&#39;hash&#39;] == data[j][&#39;hash&#39;]:
                        _, filename = os.path.split(data[i][&#39;video_abs_path&#39;])
                        logger.error(f&#39;移动文件:【{filename}】&#39;)
                        shutil.move(
                            os.path.join(base_dir, filename),
                            os.path.join(os.path.join(os.getcwd(), &#39;dup_video&#39;), filename)
                        )
        logger.warning(&#39;---------------------超长视频----------------------&#39;)
        for i in self._over_length_video:
            _, name = os.path.split(i)
            logger.error(name)


def main():
    path = popup_get_folder(&#39;请选择[视频去重]文件夹&#39;)
    v = VideoDuplicate()
    v(path)


if __name__ == &#39;__main__&#39;:
    main()

Methodenzusatz im Verzeichnis derselben Ebene

python+opencv zum Extrahieren von Videobildern und Deduplizieren

import os 
import sys
import cv2
import glob
import json
import numpy as np
import skimage
from skimage import metrics
import hashlib
print(skimage.__version__)

def load_json(json_file):
    with open(json_file) as fp:
        data = json.load(fp)
    return data[&#39;outputs&#39;]


def ssim_dis(im1, im2):
    ssim = metrics.structural_similarity(im1, im2, data_range=255, multichannel=True)
    return ssim

# cv2
def isdarkOrBright(grayImg, thre_dark=10, thre_bright=230):
    mean = np.mean(grayImg)
    if mean < thre_dark or mean > thre_bright:
        return True 
    else:
        return False

def get_file_md5(file_name):
    """
    caculate md5
    : param file_name
    : return md5
    """
    m = hashlib.md5()
    with open(file_name, &#39;rb&#39;) as fobj:
        while True:
            data = fobj.read(4096)
            if not data:
                break
            m.update(data)
    return m.hexdigest()

def extract_frame(video_path, save_dir, prefix, ssim_thre=0.90):
    count = 0
    md5set = {}
    last_frame = None
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    index = 0
    tmp_frames = []
    while cap.isOpened():
        frameState, frame = cap.read()
        if not frameState or frame is None:
            break
        grayImg = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # if isdarkOrBright(grayImg):
        #     index += 1
        #     continue
        assert cv2.imwrite(&#39;tmp.jpg&#39;, frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
        md5 = get_file_md5(&#39;tmp.jpg&#39;)
        if md5 in md5set:
            md5set[md5] += 1
            index += 1
            continue
        md5set[md5] = 1
        
        save_path = os.path.join(save_dir, prefix+&#39;_&#39;+str(index).rjust(4, &#39;0&#39;)+&#39;.jpg&#39;)
        if last_frame is None:
            if cv2.imwrite(save_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                count += 1
                last_frame = frame
                tmp_frames.append(frame)
        else:
            dis = ssim_dis(last_frame, frame)
            if dis <= ssim_thre:
                save_frame = tmp_frames[len(tmp_frames)//2]
                if cv2.imwrite(save_path, save_frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                    count += 1
                    last_frame = frame
                    tmp_frames = [frame]
            else:
                tmp_frames.append(frame)
        index += 1

    cap.release()
    return count
        
        

if __name__ == &#39;__main__&#39;:
    import sys
    video_path = "videos/***.mp4"
    video_name = video_path.split("/")[-1]
    prefix = video_name[:-4]
    save_imgs_dir = prefix
    if not os.path.exists(save_imgs_dir):
        os.mkdir(save_imgs_dir)
    N = extract_frame(video_path, save_imgs_dir, prefix)
    print(video_name, N)

Deduplizieren Sie Bilder, Videos und Dateien

import os
from tkinter import *
from tkinter import messagebox
import tkinter.filedialog
root=Tk()
root.title("筛选重复的视频和照片")
root.geometry("500x500+500+200")
def wbb():
      a=[]
      c={}
      filename=tkinter.filedialog.askopenfilenames()
            
      for i in filename:
            with open(i,&#39;rb&#39;) as f:
                  a.append(f.read())
      for j in range(len(a)):
            c[a[j]]=filename[j]
      filename1=tkinter.filedialog.askdirectory()
     
      if filename1!="":
            p=1
            lb1.config(text=filename1+"下的文件为:")
            for h in c:
                k=c[h].split(".")[-1]
                with open(filename1+"/"+str(p)+"."+k,&#39;wb&#39;) as f:
                      f.write(h)
                p=p+1      
            for g in os.listdir(filename1):
                  txt.insert(END,g+&#39;\n&#39;)
                  
      else:
            messagebox.showinfo("提示",message =&#39;请选择路径&#39;)
frame1=Frame(root,relief=RAISED)
frame1.place(relx=0.0)

frame2=Frame(root,relief=GROOVE)
frame2.place(relx=0.5)

lb1=Label(frame1,text="等等下面会有变化?",font=(&#39;华文新魏&#39;,13))
lb1.pack(fill=X)    

txt=Text(frame1,width=30,height=50,font=(&#39;华文新魏&#39;,10))
txt.pack(fill=X)        

lb=Label(frame2,text="点我选择要进行筛选的文件:",font=(&#39;华文新魏&#39;,10))
lb.pack(fill=X)            
            
                  
btn=Button(frame2,text="请选择要进行筛选的文件",fg=&#39;black&#39;,relief="raised",bd="9",command=wbb)
btn.pack(fill=X)
root.mainloop()

Renderings

So implementieren Sie mit Python ein Video-Deduplizierungs-Gadget

Das obige ist der detaillierte Inhalt vonSo implementieren Sie mit Python ein Video-Deduplizierungs-Gadget. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen