0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于非常简单的Python代码就能完成流水线开发

Linux爱好者 来源:Python实用宝典 作者:Ckend 2021-11-16 18:20 次阅读

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

基于非常简单的Python代码就能完成流水线开发。

使用 PostgreSQL 作为数据处理引擎。

有Web界面可视化分析流水线执行过程。

基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。

基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

1.安装

由于使用了大量的依赖,Mara-pipelines 并不适用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,请使用 Docker 或者 Windows 下的 linux 子系统。

使用pip安装Mara-pipelines:

pip install mara-pipelines

或者:

pip install git+https://github.com/mara/mara-pipelines.git

2.使用示例

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
frommara_pipelines.commands.bash importRunBash
frommara_pipelines.pipelines importPipeline, Task
frommara_pipelines.ui.cli importrun_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

forhost in['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])

可以看到,Task包含了多个commands,这些 command s会用于真正地执行动作。

而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

pipeline.add(sub_pipeline, ['ping_localhost'])

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

importmara_db.auto_migration
importmara_db.config
importmara_db.dbs

mara_db.config.databases 
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATETABLEdata_integration_file_dependency (
node_path TEXT[] NOTNULL, 
dependency_type VARCHARNOTNULL, 
hashVARCHAR, 
timestampTIMESTAMPWITHOUTTIMEZONE, 
PRIMARY KEY(node_path, dependency_type)
);

.. more tables

为了运行这个流水线,你需要:

frommara_pipelines.ui.cli importrun_pipeline

run_pipeline(pipeline)

这将运行单个流水线节点及其 (sub_pipeline) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

所有子节点的图以及它们之间的依赖关系

流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)

所有流水线节点及其平均运行时间和由此产生的排队优先级的表

流水线最后一次运行的输出和时间线

对于每个任务,都有一个页面显示

流水线中任务的上游和下游

最近 30 天内任务的运行时间

任务的所有命令

任务最后运行的输出

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点。

责任编辑:haq

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6509

    浏览量

    87557
  • python
    +关注

    关注

    51

    文章

    4667

    浏览量

    83443

原文标题:超级方便的轻量级 Python 流水线工具,还有漂亮的可视化界面!

文章出处:【微信号:LinuxHub,微信公众号:Linux爱好者】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    固定式安装工业读码器,助力提高流水线人工上料效率

    上的条码信息,无需手动操作即可完成数据采集。这种自动化的条码扫描技术大大缩短了人工上料的时间,提高了生产效率。而工业读码器的稳定性和准确度保证了流水线上料过程中的顺畅
    的头像 发表于 02-27 14:43 91次阅读
    固定式安装工业读码器,助力提高<b class='flag-5'>流水线</b>人工上料效率

    【RISC-V开放架构设计之道|阅读体验】RV64指令集设计的思考以及与流水线设计的逻辑

    的执行过程分解成多个阶段,并在多个阶段并行执行。 RISC-V指令集体系结构的简单性和可扩展性使得它非常适合流水线设计。 RISC-V指令集体系结构的五级流水线设计可以实现较高的性能,
    发表于 01-29 10:09

    超级方便的轻量级Python流水线工具

    Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下: 基于非常简单Python代码就能
    的头像 发表于 10-31 11:26 347次阅读
    超级方便的轻量级<b class='flag-5'>Python</b><b class='flag-5'>流水线</b>工具

    Mara-pipelines:轻量级的数据转换框架

    Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下: 基于非常简单Python代码就能
    的头像 发表于 10-30 10:47 256次阅读
    Mara-pipelines:轻量级的数据转换框架

    流水线ADC结构解析 流水线ADC和其它ADC的比较

    低采样速率ADC仍然采用逐次逼近(SAR)、积分型结构以及最近推出的过采样ΣΔADC,而高采样速率(几百MSPS以上)大多用闪速ADC及其各种变型电路。然而,最近几年各种各样的流水线ADC已经在速度
    发表于 09-26 10:24 508次阅读
    <b class='flag-5'>流水线</b>ADC结构解析 <b class='flag-5'>流水线</b>ADC和其它ADC的比较

    什么是流水线?ARM处理器流水线简析

    流水线是为了提高效率,能并发同时进行多个任务。
    的头像 发表于 09-05 15:39 1209次阅读
    什么是<b class='flag-5'>流水线</b>?ARM处理器<b class='flag-5'>流水线</b>简析

    2分钟快速上手华为云流水线CodeArts Pipeline的创建与运行

    涉及服务简介 软件持续交付流水线CodeArts Pipeline是华为云软件开发生产线CodeArts的一个子服务,是一个可视化的自动化任务编排调度平台,可串联编译构建、代码检查、自动化测试、部署
    的头像 发表于 09-01 13:47 239次阅读
    2分钟快速上手华为云<b class='flag-5'>流水线</b>CodeArts Pipeline的创建与运行

    制造企业常用的流水线Andon安灯呼叫系统是什么

    制造企业使用的流水线Andon安灯呼叫系统是一种非常实用的生产管理工具,它不仅能够提高生产效率和质量,还可以为企业提供宝贵的数据支持,帮助企业实现持续改进和优化。
    的头像 发表于 08-30 21:14 295次阅读

    新版本Jenkins推荐使用声明式流水线

    stage:和声明式的含义一致,定义流水线的阶段。Stage 块在脚本化流水线语法中是可选的,然而在脚本化流水线中实现 stage 块,可以清楚地在 Jenkins UI 界面中显示每个 stage 的任务子集。
    的头像 发表于 07-20 16:43 460次阅读

    半导体制冷技术应用--全自动生化免疫流水线

    全自动生化免疫流水线系统包括了标本的前处理系统、离线样本的分杯系统、生化免疫检测系统以及大容量标本贮存系统。在全自动生化免疫流水线运行过程中,工作人员只需将装有血液或尿液样品并贴有条形码的试管放到
    的头像 发表于 07-14 17:32 502次阅读
    半导体制冷技术应用--全自动生化免疫<b class='flag-5'>流水线</b>

    为什么流水线上产品条码扫描都在应用固定式工业扫码器?工业扫码器可以解决哪些问题?

    在现代制造业中,流水线上的产品条码扫描是不可或缺的环节。为了高效准确地进行条码自动扫描,许多企业更多地在引入固定式工业扫码器用于产品生产的各个环节,其中工业扫码器成为需求配置关键,为什么流水线
    的头像 发表于 06-27 16:15 490次阅读
    为什么<b class='flag-5'>流水线</b>上产品条码扫描都在应用固定式工业扫码器?工业扫码器可以解决哪些问题?

    总结一下pipeline流水线设计的关键点

    pipeline流水线设计是一种典型的面积换性能的设计。一方面通过对长功能路径的合理划分,在同一时间内同时并行多个该功能请求,大大提高了某个功能的吞吐率
    发表于 06-27 15:26 1088次阅读
    总结一下pipeline<b class='flag-5'>流水线</b>设计的关键点

    串联式流水线和并联式流水线

    改线润滑为油寖式。以前的流水线是按人类设计的,所以有时间间隔,上了机器人后,这些都要改。最大的问题是,润滑及发热。按各机种的经验,汽车变速箱的油寖式设计最好。能长时间使用,并且少维护。解决了这些问题,机器就能加速了。 并联式流水线
    发表于 05-19 18:30

    什么是流水线 Jenkins的流水线详解

    jenkins 有 2 种流水线分为声明式流水线与脚本化流水线,脚本化流水线是 jenkins 旧版本使用的流水线脚本,新版本 Jenkin
    发表于 05-17 16:57 650次阅读

    一个典型的流水线设计

    流水线设计通常可以在一定程度上提升系统的时钟频率,因此常常作为时序性能优化的一种常用技巧。如果某个原本单个时钟周期完成的逻辑功能块可以进一步细分为若干个更小的步骤进行处理,而且整个数据处理过程是单向
    的头像 发表于 05-08 10:55 684次阅读
    一个典型的<b class='flag-5'>流水线</b>设计