搜尋
首頁後端開發Python教學用Python的Django框架完成视频处理任务的教程

Stickyworld 的网页应用已经支持视频拨放一段时间,但都是通过YouTube的嵌入模式实现。我们开始提供新的版本支持视频操作,可以让我们的用户不用受制于YouTube的服务。

我过去曾经参与过一个项目,客户需要视频转码功能,这实在不是个容易达成的需求。需要大量的读取每一个视频、音讯与视频容器的格式再输出符合网页使用与喜好的视频格式。

考虑到这一点,我们决定将转码的工作交给 Encoding.com 。这个网站可以免费让你编码1GB大小的视频,超过1GB容量的文件将采取分级计价收费。

开发的代码如下,我上传了一个178KB容量的两秒视频来测试代码是否成功运作。当测试过程没有发生任何的例外错误后,我继续测试其它更大的外部文件。

 
阶段一:用户上传视频文件

现在这的新的代码段提供了一个基于 HTML5且可以快速上手的 的上传机制。用CoffeeScript撰写的代码,可以从客户端上传文件到服务器端。
 

$scope.upload_slide = (upload_slide_form) ->
  file = document.getElementById("slide_file").files[0]
  reader = new FileReader()
  reader.readAsDataURL file
  reader.onload = (event) ->
   result = event.target.result
   fileName = document.getElementById("slide_file").files[0].name
   $.post "/world/upload_slide",
    data: result
    name: fileName
    room_id: $scope.room.id
    (response_data) ->
     if response_data.success? is not yes
      console.error "There was an error uploading the file", response_data
     else
      console.log "Upload successful", response_data
  reader.onloadstart = ->
   console.log "onloadstart"
  reader.onprogress = (event) ->
   console.log "onprogress", event.total, event.loaded, (event.loaded / event.total) * 100
  reader.onabort = ->
   console.log "onabort"
  reader.onerror = ->
   console.log "onerror"
  reader.onloadend = (event) ->
   console.log "onloadend", event

最好可以通过 (“slide_file”).files 且经由独立的POST上传每个文件,而不是由一个POST需求上传所有文件。稍后我们会解释这点。

 
阶段二:验证并上传至 Amazon S3

后端我们运行了Django与RabbitMQ。主要的模块如下:
 

$ pip install 'Django>=1.5.2' 'django-celery>=3.0.21' \
  'django-storages>=1.1.8' 'lxml>=3.2.3' 'python-magic>=0.4.3'

我建立了两个模块:SlideUploadQueue 用来储存每一次上传的数据,SlideVideoMedia 则是用来储存每个要上传影片的数据。
 
class SlideUploadQueue(models.Model):
  created_by = models.ForeignKey(User)
  created_time = models.DateTimeField(db_index=True)
  original_file = models.FileField(
    upload_to=filename_sanitiser, blank=True, default='')
  media_type = models.ForeignKey(MediaType)
  encoding_com_tracking_code = models.CharField(
    default='', max_length=24, blank=True)
 
  STATUS_AWAITING_DATA = 0
  STATUS_AWAITING_PROCESSING = 1
  STATUS_PROCESSING = 2
  STATUS_AWAITING_3RD_PARTY_PROCESSING = 5
  STATUS_FINISHED = 3
  STATUS_FAILED = 4
 
  STATUS_LIST = (
    (STATUS_AWAITING_DATA, 'Awaiting Data'),
    (STATUS_AWAITING_PROCESSING, 'Awaiting processing'),
    (STATUS_PROCESSING, 'Processing'),
    (STATUS_AWAITING_3RD_PARTY_PROCESSING,
      'Awaiting 3rd-party processing'),
    (STATUS_FINISHED, 'Finished'),
    (STATUS_FAILED, 'Failed'),
  )
 
  status = models.PositiveSmallIntegerField(
    default=STATUS_AWAITING_DATA, choices=STATUS_LIST)
 
  class Meta:
    verbose_name = 'Slide'
    verbose_name_plural = 'Slide upload queue'
 
  def save(self, *args, **kwargs):
    if not self.created_time:
      self.created_time = \
        datetime.utcnow().replace(tzinfo=pytz.utc)
 
    return super(SlideUploadQueue, self).save(*args, **kwargs)
 
  def __unicode__(self):
    if self.id is None:
      return 'new <SlideUploadQueue>'
    return '<SlideUploadQueue> %d' % self.id
 
class SlideVideoMedia(models.Model):
  converted_file = models.FileField(
    upload_to=filename_sanitiser, blank=True, default='')
 
  FORMAT_MP4 = 0
  FORMAT_WEBM = 1
  FORMAT_OGG = 2
  FORMAT_FL9 = 3
  FORMAT_THUMB = 4
 
  supported_formats = (
    (FORMAT_MP4, 'MPEG 4'),
    (FORMAT_WEBM, 'WebM'),
    (FORMAT_OGG, 'OGG'),
    (FORMAT_FL9, 'Flash 9 Video'),
    (FORMAT_THUMB, 'Thumbnail'),
  )
 
  mime_types = (
    (FORMAT_MP4, 'video/mp4'),
    (FORMAT_WEBM, 'video/webm'),
    (FORMAT_OGG, 'video/ogg'),
    (FORMAT_FL9, 'video/mp4'),
    (FORMAT_THUMB, 'image/jpeg'),
  )
 
  format = models.PositiveSmallIntegerField(
    default=FORMAT_MP4, choices=supported_formats)
 
  class Meta:
    verbose_name = 'Slide video'
    verbose_name_plural = 'Slide videos'
 
  def __unicode__(self):
    if self.id is None:
      return 'new <SlideVideoMedia>'
    return '<SlideVideoMedia> %d' % self.id

我们的模块皆使用 filename_sanitiser。FileField 自动的将文件名调整成 /. 格式。整理每个文件名并确保其独一性。我们采用了有时效性签署的网址列让我们可以掌控哪些使用者在使用我们的服务,使用了多久。
 

def filename_sanitiser(instance, filename):
  folder = instance.__class__.__name__.lower()
  ext = 'jpg'
 
  if '.' in filename:
    t_ext = filename.split('.')[-1].strip().lower()
 
    if t_ext != '':
      ext = t_ext
 
  return '%s/%s.%s' % (folder, str(uuid.uuid4()), ext)

拿来测试的文件 testing.mov 将会转换成以下网址:https://our-bucket.s3.amazonaws.com/slideuploadqueue/3fe27193-e87f-4244-9aa2-66409f70ebd3.mov 并经由Django Storages 模块上传。

我们通过 Magic 验证从使用者端浏览器上传的文件。Magic可以从文件内容侦测是何种类型的文件。
 

@verify_auth_token
@return_json
def upload_slide(request):
  file_data = request.POST.get('data', '')
  file_data = base64.b64decode(file_data.split(';base64,')[1])
  description = magic.from_buffer(file_data)

如果文件类型符合MPEG v4 系统或是Apple QuickTime 电影,我们就知道该文件转码不会有太大问题。如果格式不是上述所提的几种,我们会标志给用户知悉。
接着,我们将通过SlideUploadQueue 模块将视频储存到队列并发送一个需求给 RabbitMQ。因为我们使用了Django Storages 模块,文件将自动被上传到 Amazon S3。
 

slide_upload = SlideUploadQueue()
...
slide_upload.status = SlideUploadQueue.STATUS_AWAITING_PROCESSING
slide_upload.save()
slide_upload.original_file.\
  save('anything.%s' % file_ext, ContentFile(file_data))
slide_upload.save()
 
task = ConvertRawSlideToSlide()
task.delay(slide_upload)

阶段3:发送视频到第三方.

RabbitMQ 将控管 task.delay(slide_upload) 的呼叫。
我们现在只需要发送视频档网址与输出格式给Encoding.com。该网站会回复我们一个工作码让我们检查视频转码的进度。

class ConvertRawSlideToSlide(Task):
  queue = 'backend_convert_raw_slides'
  ...
  def _handle_video(self, slide_upload):
    mp4 = {
      'output': 'mp4',
      'size': '320x240',
      'bitrate': '256k',
      'audio_bitrate': '64k',
      'audio_channels_number': '2',
      'keep_aspect_ratio': 'yes',
      'video_codec': 'mpeg4',
      'profile': 'main',
      'vcodecparameters': 'no',
      'audio_codec': 'libfaac',
      'two_pass': 'no',
      'cbr': 'no',
      'deinterlacing': 'no',
      'keyframe': '300',
      'audio_volume': '100',
      'file_extension': 'mp4',
      'hint': 'no',
    }
 
    webm = {
      'output': 'webm',
      'size': '320x240',
      'bitrate': '256k',
      'audio_bitrate': '64k',
      'audio_sample_rate': '44100',
      'audio_channels_number': '2',
      'keep_aspect_ratio': 'yes',
      'video_codec': 'libvpx',
      'profile': 'baseline',
      'vcodecparameters': 'no',
      'audio_codec': 'libvorbis',
      'two_pass': 'no',
      'cbr': 'no',
      'deinterlacing': 'no',
      'keyframe': '300',
      'audio_volume': '100',
      'preset': '6',
      'file_extension': 'webm',
      'acbr': 'no',
    }
 
    ogg = {
      'output': 'ogg',
      'size': '320x240',
      'bitrate': '256k',
      'audio_bitrate': '64k',
      'audio_sample_rate': '44100',
      'audio_channels_number': '2',
      'keep_aspect_ratio': 'yes',
      'video_codec': 'libtheora',
      'profile': 'baseline',
      'vcodecparameters': 'no',
      'audio_codec': 'libvorbis',
      'two_pass': 'no',
      'cbr': 'no',
      'deinterlacing': 'no',
      'keyframe': '300',
      'audio_volume': '100',
      'file_extension': 'ogg',
      'acbr': 'no',
    }
 
    flv = {
      'output': 'fl9',
      'size': '320x240',
      'bitrate': '256k',
      'audio_bitrate': '64k',
      'audio_channels_number': '2',
      'keep_aspect_ratio': 'yes',
      'video_codec': 'libx264',
      'profile': 'high',
      'vcodecparameters': 'no',
      'audio_codec': 'libfaac',
      'two_pass': 'no',
      'cbr': 'no',
      'deinterlacing': 'no',
      'keyframe': '300',
      'audio_volume': '100',
      'file_extension': 'mp4',
    }
 
    thumbnail = {
      'output': 'thumbnail',
      'time': '5',
      'video_codec': 'mjpeg',
      'keep_aspect_ratio': 'yes',
      'file_extension': 'jpg',
    }
 
    encoder = Encoding(settings.ENCODING_API_USER_ID,
      settings.ENCODING_API_USER_KEY)
    resp = encoder.add_media(source=[slide_upload.original_file.url],
      formats=[mp4, webm, ogg, flv, thumbnail])
 
    media_id = None
 
    if resp is not None and resp.get('response') is not None:
      media_id = resp.get('response').get('MediaID')
 
    if media_id is None:
      slide_upload.status = SlideUploadQueue.STATUS_FAILED
      slide_upload.save()
      log.error('Unable to communicate with encoding.com')
      return False
 
    slide_upload.encoding_com_tracking_code = media_id
    slide_upload.status = \
      SlideUploadQueue.STATUS_AWAITING_3RD_PARTY_PROCESSING
    slide_upload.save()
    return True

Encoding.com 推荐一些堪用的Python程序,可用来与它们的服务沟通。我修改了模块一些地方,但还需要修改一些功能才能达到我满意的状态。以下是修改过后目前正在使用的程序代码:

import httplib
from lxml import etree
import urllib
from xml.parsers.expat import ExpatError
import xmltodict
 
ENCODING_API_URL = 'manage.encoding.com:80'
 
class Encoding(object):
 
  def __init__(self, userid, userkey, url=ENCODING_API_URL):
    self.url = url
    self.userid = userid
    self.userkey = userkey
 
  def get_media_info(self, action='GetMediaInfo', ids=[],
    headers={'Content-Type': 'application/x-www-form-urlencoded'}):
    query = etree.Element('query')
 
    nodes = {
      'userid': self.userid,
      'userkey': self.userkey,
      'action': action,
      'mediaid': ','.join(ids),
    }
 
    query = self._build_tree(etree.Element('query'), nodes)
    results = self._execute_request(query, headers)
 
    return self._parse_results(results)
 
  def get_status(self, action='GetStatus', ids=[], extended='no',
    headers={'Content-Type': 'application/x-www-form-urlencoded'}):
    query = etree.Element('query')
 
    nodes = {
      'userid': self.userid,
      'userkey': self.userkey,
      'action': action,
      'extended': extended,
      'mediaid': ','.join(ids),
    }
 
    query = self._build_tree(etree.Element('query'), nodes)
    results = self._execute_request(query, headers)
 
    return self._parse_results(results)
 
  def add_media(self, action='AddMedia', source=[], notify='', formats=[],
    instant='no',
    headers={'Content-Type': 'application/x-www-form-urlencoded'}):
    query = etree.Element('query')
 
    nodes = {
      'userid': self.userid,
      'userkey': self.userkey,
      'action': action,
      'source': source,
      'notify': notify,
      'instant': instant,
    }
 
    query = self._build_tree(etree.Element('query'), nodes)
 
    for format in formats:
      format_node = self._build_tree(etree.Element('format'), format)
      query.append(format_node)
 
    results = self._execute_request(query, headers)
    return self._parse_results(results)
 
  def _build_tree(self, node, data):
    for k, v in data.items():
      if isinstance(v, list):
        for item in v:
          element = etree.Element(k)
          element.text = item
          node.append(element)
      else:
        element = etree.Element(k)
        element.text = v
        node.append(element)
 
    return node
 
  def _execute_request(self, xml, headers, path='', method='POST'):
    params = urllib.urlencode({'xml': etree.tostring(xml)})
 
    conn = httplib.HTTPConnection(self.url)
    conn.request(method, path, params, headers)
    response = conn.getresponse()
    data = response.read()
    conn.close()
    return data
 
  def _parse_results(self, results):
    try:
      return xmltodict.parse(results)
    except ExpatError, e:
      print 'Error parsing encoding.com response'
      print e
      return None

其他待完成事项包括通过HTTPS-only (加密联机) 使用Encoding.com 严谨的SSL验证,还有一些单元测试。
阶段4:下载所有新的视频档格式

我们有个定期执行的程序,通过RabbitMQ每15秒检查视频转码的进度:
 

class CheckUpOnThirdParties(PeriodicTask):
  run_every = timedelta(seconds=settings.THIRD_PARTY_CHECK_UP_INTERVAL)
  ...
  def _handle_encoding_com(self, slides):
    format_lookup = {
      'mp4': SlideVideoMedia.FORMAT_MP4,
      'webm': SlideVideoMedia.FORMAT_WEBM,
      'ogg': SlideVideoMedia.FORMAT_OGG,
      'fl9': SlideVideoMedia.FORMAT_FL9,
      'thumbnail': SlideVideoMedia.FORMAT_THUMB,
    }
 
    encoder = Encoding(settings.ENCODING_API_USER_ID,
      settings.ENCODING_API_USER_KEY)
 
    job_ids = [item.encoding_com_tracking_code for item in slides]
    resp = encoder.get_status(ids=job_ids)
 
    if resp is None:
      log.error('Unable to check up on encoding.com')
      return False

检查Encoding.com的响应来验证每个部分是否正确以利我们继续下去。

if resp.get('response') is None:
  log.error('Unable to get response node from encoding.com')
  return False
 
resp_id = resp.get('response').get('id')
 
if resp_id is None:
  log.error('Unable to get media id from encoding.com')
  return False
 
slide = SlideUploadQueue.objects.filter(
  status=SlideUploadQueue.STATUS_AWAITING_3RD_PARTY_PROCESSING,
  encoding_com_tracking_code=resp_id)
 
if len(slide) != 1:
  log.error('Unable to find a single record for %s' % resp_id)
  return False
 
resp_status = resp.get('response').get('status')
 
if resp_status is None:
  log.error('Unable to get status from encoding.com')
  return False
 
if resp_status != u'Finished':
  log.debug("%s isn't finished, will check back later" % resp_id)
  return True
 
formats = resp.get('response').get('format')
 
if formats is None:
  log.error("No output formats were found. Something's wrong.")
  return False
 
for format in formats:
  try:
    assert format.get('status') == u'Finished', \
    "%s is not finished. Something's wrong." % format.get('id')
 
    output = format.get('output')
    assert output in ('mp4', 'webm', 'ogg', 'fl9',
      'thumbnail'), 'Unknown output format %s' % output
 
    s3_dest = format.get('s3_destination')
    assert 'http://encoding.com.result.s3.amazonaws.com/'\
      in s3_dest, 'Suspicious S3 url: %s' % s3_dest
 
    https_link = \
      'https://s3.amazonaws.com/encoding.com.result/%s' %\
      s3_dest.split('/')[-1]
    file_ext = https_link.split('.')[-1].strip()
 
    assert len(file_ext) > 0,\
      'Unable to get file extension from %s' % https_link
 
    count = SlideVideoMedia.objects.filter(slide_upload=slide,
      format=format_lookup[output]).count()
 
    if count != 0:
      print 'There is already a %s file for this slide' % output
      continue
 
    content = self.download_content(https_link)
 
    assert content is not None,\
      'There is no content for %s' % format.get('id')
  except AssertionError, e:
    log.error('A format did not pass all assertions: %s' % e)
    continue

到这里我们已确认所有事项皆正常,所以我们可以储存所有的视频档了:
 

media = SlideVideoMedia()
media.format = format_lookup[output]
media.converted_file.save('blah.%s' % file_ext, ContentFile(content))
media.save()

阶段5:经由HTML5播放视频档

在我们的前端网页已经新增了一个有HTML5的影像单元的网页。并采用对每个浏览器都有最佳支持的video.js来显示视频。
 

&#63; bower install video.js
bower caching git://github.com/videojs/video.js-component.git
bower cloning git://github.com/videojs/video.js-component.git
bower fetching video.js
bower checking out video.js#v4.0.3
bower copying /home/mark/.bower/cache/video.js/5ab058cd60c5615aa38e8e706cd0f307
bower installing video.js#4.0.3

在我们的首页有包含其他相依的文件:
 

!!! 5
html(lang="en", class="no-js")
 head
  meta(http-equiv='Content-Type', content='text/html; charset=UTF-8')
  ...
  link(rel='stylesheet', type='text/css', href='/components/video-js-4.1.0/video-js.css')
  script(type='text/javascript', src='/components/video-js-4.1.0/video.js')

在Angular.js/JADE-based 框架下的模块,我们引入

#main.span12
  video#example_video_1.video-js.vjs-default-skin(controls, preload="auto", width="640", height="264", poster="{{video_thumbnail}}", data-setup='{"example_option":true}', ng-show="videos")
    source(ng-repeat="video in videos", src="{{video.src}}", type="{{video.type}}")

还会显示出我们转换的每个视频文件格式,并使用在标签。Video.js 会根据使用者使用的浏览器决定播放哪种格式的视频。

我们仍然有许多工作需要完成,建立单元测试与加强和Encoding.com服务沟通的程序。如果你对这些工作感兴趣请与我连络。

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
Python的執行模型:編譯,解釋還是兩者?Python的執行模型:編譯,解釋還是兩者?May 10, 2025 am 12:04 AM

pythonisbothCompileDIntered。

Python是按線執行的嗎?Python是按線執行的嗎?May 10, 2025 am 12:03 AM

Python不是嚴格的逐行執行,而是基於解釋器的機制進行優化和條件執行。解釋器將代碼轉換為字節碼,由PVM執行,可能會預編譯常量表達式或優化循環。理解這些機制有助於優化代碼和提高效率。

python中兩個列表的串聯替代方案是什麼?python中兩個列表的串聯替代方案是什麼?May 09, 2025 am 12:16 AM

可以使用多種方法在Python中連接兩個列表:1.使用 操作符,簡單但在大列表中效率低;2.使用extend方法,效率高但會修改原列表;3.使用 =操作符,兼具效率和可讀性;4.使用itertools.chain函數,內存效率高但需額外導入;5.使用列表解析,優雅但可能過於復雜。選擇方法應根據代碼上下文和需求。

Python:合併兩個列表的有效方法Python:合併兩個列表的有效方法May 09, 2025 am 12:15 AM

有多種方法可以合併Python列表:1.使用 操作符,簡單但對大列表不內存高效;2.使用extend方法,內存高效但會修改原列表;3.使用itertools.chain,適用於大數據集;4.使用*操作符,一行代碼合併小到中型列表;5.使用numpy.concatenate,適用於大數據集和性能要求高的場景;6.使用append方法,適用於小列表但效率低。選擇方法時需考慮列表大小和應用場景。

編譯的與解釋的語言:優點和缺點編譯的與解釋的語言:優點和缺點May 09, 2025 am 12:06 AM

CompiledLanguagesOffersPeedAndSecurity,而interneterpretledlanguages provideeaseafuseanDoctability.1)commiledlanguageslikec arefasterandSecureButhOnderDevevelmendeclementCyclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesclesandentency.2)cransportedeplatectentysenty

Python:對於循環,最完整的指南Python:對於循環,最完整的指南May 09, 2025 am 12:05 AM

Python中,for循環用於遍歷可迭代對象,while循環用於條件滿足時重複執行操作。 1)for循環示例:遍歷列表並打印元素。 2)while循環示例:猜數字遊戲,直到猜對為止。掌握循環原理和優化技巧可提高代碼效率和可靠性。

python concatenate列表到一個字符串中python concatenate列表到一個字符串中May 09, 2025 am 12:02 AM

要將列表連接成字符串,Python中使用join()方法是最佳選擇。 1)使用join()方法將列表元素連接成字符串,如''.join(my_list)。 2)對於包含數字的列表,先用map(str,numbers)轉換為字符串再連接。 3)可以使用生成器表達式進行複雜格式化,如','.join(f'({fruit})'forfruitinfruits)。 4)處理混合數據類型時,使用map(str,mixed_list)確保所有元素可轉換為字符串。 5)對於大型列表,使用''.join(large_li

Python的混合方法:編譯和解釋合併Python的混合方法:編譯和解釋合併May 08, 2025 am 12:16 AM

pythonuseshybridapprace,ComminingCompilationTobyTecoDeAndInterpretation.1)codeiscompiledtoplatform-Indepententbybytecode.2)bytecodeisisterpretedbybythepbybythepythonvirtualmachine,增強效率和通用性。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

Atom編輯器mac版下載

Atom編輯器mac版下載

最受歡迎的的開源編輯器

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)