2023 年 2 月:我想查看电影电视节目的所有分数以及在一个页面上流式传输它们的位置,但找不到包含与我相关的所有来源的聚合器。
2023 年 3 月:因此,我构建了一个 MVP,可以即时获取分数并将网站上线。它有效,但速度很慢(10 秒显示分数)。
2023 年 10 月:意识到在我这边存储数据是必要的,我发现了 Windmill.dev。它很容易超越类似的编排引擎 - 至少满足我的需求。
快进到今天,经过 12 个月的连续数据咀嚼,我想详细分享管道的工作原理。您将学习如何构建一个复杂的系统,从许多不同的来源获取数据,规范化数据并将其组合成优化的查询格式。
图片或没有发生!
这是运行视图。每个点代表一次流程运行。流程可以是任何内容,例如简单的一步脚本:
中心的块包含这样的脚本(简化):
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
下面的野兽也是一个流(记住,这只是其中一个绿点):
(更高分辨率的图片:https://i.imgur.com/LGhTUGG.png)
让我们来分解一下:
- 获取下一个优先电影或电视节目(请参阅下一部分)
- 从TMDB获取最新数据
- 从 IMDb、Metacritic 和 烂番茄 获取当前分数
- 刮电视比喻...比喻
- Huggingface API 用于收集 DNA 数据(下面将解释)
- 存储高维DNA 数据向量
- 存储电影、节目和流媒体链接的关系数据
每个步骤都或多或少复杂,并且涉及使用异步进程。
你从哪里开始?优先队列
为了确定接下来选择哪些标题,有两个并行处理的通道。这是风车的另一个亮点。并行化和编排与他们的架构完美配合。
选择下一个项目的两条通道是:
Lane 1:每个数据源单独的流
首先,将为每个数据源选择没有附加任何数据的标题。这意味着如果 Metacritic 管道中有一部尚未被抓取的电影,那么接下来就会选择它。这可确保每个标题至少被处理一次,包括新标题。
一旦每个标题都附加了数据,管道就会选择具有最新数据的标题。
这是此类流程运行的示例,由于达到了速率限制,因此出现错误:
Windmill 允许您轻松定义流程中每个步骤的重试次数。在这种情况下,逻辑是在出现错误时重试 3 次。除非达到速率限制(通常是不同的状态代码或错误消息),否则我们会立即停止。
Lane 2:每部电影/节目单独的优先流
上述方法有效,但有一个严重的问题:最近的版本更新不够及时。成功获取每个数据方面可能需要几周甚至几个月的时间。例如,可能会出现一部电影有最近的 IMDb 评分,但其他评分已过时且流媒体链接完全丢失的情况。特别是对于分数和流媒体可用性,我希望获得更好的准确性。
为了解决这个问题,第二条通道侧重于不同的优先级策略:选择最流行和最热门的电影/节目来跨所有数据源进行完整的数据刷新。我之前展示过这个流程,这就是我之前所说的野兽。
应用程序上更频繁显示的标题也会获得优先级提升。这意味着每次电影或节目出现在顶部搜索结果中或打开其详细信息视图时,它们可能很快就会刷新。
每个标题只能每周刷新一次使用优先通道以确保我们不会获取同时可能未更改的数据。
你可以这样做吗?抓取注意事项
你可能会问:抓取合法吗?抓取数据的行为通常是没问题的。不过,您对数据的处理方式需要仔细考虑。 一旦您从使用抓取数据的服务中获利,您就可能违反了他们的条款和条件。(请参阅网络抓取的法律格局和“抓取”只是自动访问,每个人都这样做)
抓取和相关法律是新的,通常未经测试,并且存在很多法律灰色地带。我决心相应地引用每个来源,尊重速率限制并避免不必要的请求,以尽量减少对他们服务的影响。
事实是,这些数据不会被用来盈利。 GoodWatch 将永远免费供所有人使用。
更多工作?是的,大人
Windmill 使用工作线程在多个进程之间分配代码执行。 流程中的每个步骤都会发送给工作人员,这使得它们独立于实际的业务逻辑。只有主应用程序编排作业,而工作人员仅接收输入数据、执行代码并返回结果。
这是一个高效的架构,可以很好地扩展。目前,有12名工人正在分工。它们都托管在 Hetzner 上。
每个worker的最大资源消耗为1个vCPU和2GB RAM。概述如下:
风车编辑器
Windmill 提供类似浏览器内 IDE 的编辑器体验,包括 linting、自动格式化、AI 助手,甚至 协作编辑(最后一项是付费功能)。不过最好的是这个按钮:
它允许我在部署脚本之前快速迭代和测试脚本。我通常在浏览器中编辑和测试文件,完成后将它们推送到 git。
最佳编码环境唯一缺少的是调试工具(断点和变量上下文)。目前,我正在本地 IDE 中调试脚本来克服这个弱点。
数字。我喜欢数字
我也是!
目前 GoodWatch 需要大约 100 GB 持久数据存储:
- 15 GB 用于原始预处理数据 (MongoDB)
- 23 GB 用于处理的关系数据 (Postgres)
- 67 GB 用于矢量数据 (Postgres)
每天 6.500 个流 通过 Windmill 的编排引擎运行。这导致每日交易量为:
- 30.000 IMDb 页
- 9.000 电视比喻页面
- 5.000 烂番茄页面
- 1.500拥抱表情提示
- 600 Metacritic 页面
由于不同的速率限制政策,这些数字根本不同。
每天一次,数据被清理并组合成最终的数据格式。目前为 GoodWatch 网络应用商店提供支持的数据库:
- 1000万流媒体链接
- 100 万部 部电影
- 300k DNA 值
- 20万电视节目
- 70k 具有 DNA 的电影/节目
你一直在谈论的 DNA 是什么?
想象一下您只能通过电影类型来区分电影,这是非常有限的吗?
这就是我开始 DNA 项目的原因。它允许按其他属性对电影和节目进行分类,例如心情、情节元素、角色类型、对话框或关键道具.
以下是所有项目中 DNA 值的前 10 位:
它允许两件事:
- 按 DNA 值过滤(使用关系数据)
- 按相似度搜索(使用矢量数据)
示例:
- 忧郁的心情
- 与《沙丘》类似的故事:第二部分
将来将会有关于 DNA 的专门博客文章提供更多详细信息。
深入研究数据管道
为了充分了解数据管道的工作原理,以下是每个数据源发生的情况的详细说明:
1. 每天一次,MongoDB 集合会使用所有必需的输入数据进行更新
对于每个数据源,都有一个 ìnit 流程,用于准备包含所有必需数据的 MongoDB 集合。对于 IMDb,这只是 imdb_id。对于烂番茄,标题和发布年份是必需的。这是因为 ID 未知,我们需要根据名称猜测正确的 URL。
2.不断地获取数据并写入MongoDB集合
根据上面解释的优先级选择,准备好的集合中的项目将使用获取的数据进行更新。每个数据源都有自己的集合,随着时间的推移,这些集合会变得越来越完整。
3. 每天一次,各种流程从 MongoDB 集合中收集数据并将其写入 Postgres
有一个电影流,一个电视节目流,还有一个流媒体链接流。它们从各种集合中收集所有必要的数据,并将它们存储在各自的 Postgres 表中,然后由 Web 应用程序查询。
以下是复制电影流程和脚本的摘录:
其中一些流程需要很长时间才能执行,有时甚至超过 6 小时。这可以通过标记所有已更新的项目并仅复制这些项目而不是批量处理整个数据集来优化。我的清单上众多 TODO 项目之一?
调度
调度就像为需要自动执行的每个流程或脚本定义 cron 表达式一样简单:
以下是为 GoodWatch 定义的所有时间表的摘录:
总共定义了大约 50 个时间表。
挑战
伟大的数据伴随着巨大的责任。很多事情都可能出错。确实如此。
处理速度非常慢
我的脚本的早期版本需要很长时间才能更新集合或表中的所有条目。那是因为我单独更新了每个项目。这会导致大量开销并显着减慢进程。
更好的方法是收集要更新插入的数据并对数据库查询进行批处理。这是 MongoDB 的示例:
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
内存消耗大的脚本
即使使用批处理,某些脚本也会消耗大量内存,导致工作线程崩溃。解决方案是针对每个用例仔细调整批量大小。
某些批次可以以 5000 为步长运行,其他批次可以在内存中存储更多数据,并且以 500 为步长运行效果更好。
Windmill 有一个很棒的功能,可以在脚本运行时观察内存:
要点
Windmill 是任何开发人员用于自动化任务的工具包中的重要资产。它对我来说是一个无价的生产力提升器,让我能够专注于流程结构和业务逻辑,同时外包任务编排、错误处理、重试和缓存的繁重工作。
处理大量数据仍然具有挑战性,优化管道是一个持续的过程 - 但我对迄今为止一切的结果感到非常满意。
好吧,好吧。够了
也是这么想的。只需让我链接一些资源,我们就完成了:
- 好手表
- GoodWatch Discord 社区
- 风车
- 风车不和谐社区
您知道GoodWatch 是开源的吗?您可以查看此存储库中的所有脚本和流定义:https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f
如果您有任何疑问,请告诉我。
以上是数百万部电影和数百万流媒体链接的数据管道的详细内容。更多信息请关注PHP中文网其他相关文章!

Python列表切片的基本语法是list[start:stop:step]。1.start是包含的第一个元素索引,2.stop是排除的第一个元素索引,3.step决定元素之间的步长。切片不仅用于提取数据,还可以修改和反转列表。

ListSoutPerformarRaysin:1)DynamicsizicsizingandFrequentInsertions/删除,2)储存的二聚体和3)MemoryFeliceFiceForceforseforsparsedata,butmayhaveslightperformancecostsinclentoperations。

toConvertapythonarraytoalist,usEthelist()constructororageneratorexpression.1)intimpthearraymoduleandcreateanArray.2)USELIST(ARR)或[XFORXINARR] to ConconverTittoalist,请考虑performorefformanceandmemoryfformanceandmemoryfformienceforlargedAtasetset。

choosearraysoverlistsinpythonforbetterperformanceandmemoryfliceSpecificScenarios.1)largenumericaldatasets:arraysreducememoryusage.2)绩效 - 临界杂货:arraysoffersoffersOffersOffersOffersPoostSfoostSforsssfortasssfortaskslikeappensearch orearch.3)testessenforcety:arraysenforce:arraysenforc

在Python中,可以使用for循环、enumerate和列表推导式遍历列表;在Java中,可以使用传统for循环和增强for循环遍历数组。1.Python列表遍历方法包括:for循环、enumerate和列表推导式。2.Java数组遍历方法包括:传统for循环和增强for循环。

本文讨论了Python版本3.10中介绍的新“匹配”语句,该语句与其他语言相同。它增强了代码的可读性,并为传统的if-elif-el提供了性能优势

Python中的功能注释将元数据添加到函数中,以进行类型检查,文档和IDE支持。它们增强了代码的可读性,维护,并且在API开发,数据科学和图书馆创建中至关重要。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

EditPlus 中文破解版
体积小,语法高亮,不支持代码提示功能

ZendStudio 13.5.1 Mac
功能强大的PHP集成开发环境

记事本++7.3.1
好用且免费的代码编辑器