0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Mara-pipelines:轻量级的数据转换框架

科技绿洲 来源:Python实用宝典 作者:Python实用宝典 2023-10-30 10:47 次阅读

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

  • 基于非常简单的Python代码就能完成流水线开发。
  • 使用 PostgreSQL 作为数据处理引擎。
  • 有Web界面可视化分析流水线执行过程。
  • 基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。
  • 基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

1.安装

由于使用了大量的依赖,Mara-pipelines 并不适用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,请使用 Docker 或者 Windows 下的 linux 子系统。

使用pip安装Mara-pipelines:

pip install mara-pipelines

或者:

pip install git+https://github.com/mara/mara-pipelines.git

2.使用示例

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively

pipeline = Pipeline(
    id='demo',
    description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
                  commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

for host in ['google', 'amazon', 'facebook']:
    sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
                          commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
                      commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
                  commands=[RunBash('sleep 2')]), ['sub_pipeline'])

可以看到,Task包含了多个commands,这些 commands会用于真正地执行动作。

而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

pipeline.add(sub_pipeline, ['ping_localhost'])

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

import mara_db.auto_migration
import mara_db.config
import mara_db.dbs

mara_db.config.databases 
    = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL,
    dependency_type VARCHAR NOT NULL,
    hash VARCHAR,
    timestamp TIMESTAMP WITHOUT TIME ZONE,
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

为了运行这个流水线,你需要:

from mara_pipelines.ui.cli import run_pipeline

run_pipeline(pipeline)

图片

这将运行单个流水线节点及其 ( **sub_pipeline ** ) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

  • 所有子节点的图以及它们之间的依赖关系
  • 流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)
  • 所有流水线节点及其平均运行时间和由此产生的排队优先级的表
  • 流水线最后一次运行的输出和时间线

图片

对于每个任务,都有一个页面显示

  • 流水线中任务的上游和下游
  • 最近 30 天内任务的运行时间
  • 任务的所有命令
  • 任务最后运行的输出

图片

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点:

图片

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据转换
    +关注

    关注

    0

    文章

    76

    浏览量

    17890
  • 代码
    +关注

    关注

    30

    文章

    4555

    浏览量

    66771
  • python
    +关注

    关注

    51

    文章

    4675

    浏览量

    83467
收藏 人收藏

    评论

    相关推荐

    一个面向嵌入式系统的轻量级框架

    mr-library 是一个面向嵌入式系统的轻量级框架,提供统一的底层驱动设备模型以及基础服务功能,具有模块化设计、可配置性和扩展性的特点, 可帮助开发者快速构建嵌入式应用程序。
    发表于 09-01 12:22 306次阅读

    10个轻量级框架

    这些轻量级框架使用HTML5和CSS3标准来帮助您快速开发跨平台的Web移动应用和网站。
    发表于 07-17 08:25

    轻量级的ui框架如何去制作

    原创分享:自制轻量级单片机UI框架框架元素用户接口代码开源平时常看csdn,但是从来没有自己写过。正好这几天需要用单片机做一个简易的ui界面,于是自己写了一个轻量级的ui
    发表于 07-14 07:39

    Dllite_micro (轻量级的 AI 推理框架

    DLLite-Micro 是一个轻量级的 AI 推理框架,可以为 OpenHarmony OS 的轻量设备和小型设备提供深度模型的推理能力DLLite-Micro 向开发者提供清晰、易上手的北向接口
    发表于 08-05 11:40

    如何自制轻量级单片机UI框架

    如何自制轻量级单片机UI框架
    发表于 10-14 06:13

    一种超轻量级的flashKV数据存储方案分享

    tinyFlash一种超轻量级的flash KV数据存储方案Github 地址:https://github.com/ospanic/tinyFlash设计原理本方案采用两个扇区轮流使用的方法存储
    发表于 12-20 06:08

    原创分享:自制轻量级单片机UI框架

    原创分享:自制轻量级单片机UI框架框架元素用户接口代码开源平时常看csdn,但是从来没有自己写过。正好这几天需要用单片机做一个简易的ui界面,于是自己写了一个轻量级的ui
    发表于 11-05 15:20 29次下载
    原创分享:自制<b class='flag-5'>轻量级</b>单片机UI<b class='flag-5'>框架</b>

    基于非常简单的Python代码就能完成流水线开发

    Mara-pipelines 是一个轻量级数据转换框架,具有透明和低复杂性的特点。其他特点如下: 基于非常简单的Python代码就能完成流
    的头像 发表于 11-16 18:20 2621次阅读

    TinyDB轻量级数据库有哪些特点呢

    TinyDB 是一个纯 Python 编写的轻量级数据库,一共只有1800行代码,没有外部依赖项。
    的头像 发表于 10-28 14:07 1039次阅读

    一个纯Python编写的轻量级数据

    TinyDB 是一个纯 Python 编写的轻量级数据库,一共只有1800行代码,没有外部依赖项。
    的头像 发表于 02-24 10:32 539次阅读

    测评分享 | 如何在先楫HPM6750上运行轻量级AI推理框架TinyMaix

    本期内容由先楫开发者社区大咖@xusiwei1236分享基于先楫HPM6750的轻量级AI推理框架,赶紧来瞧瞧~一、TinyMaix是什么?TinyMaix是国内sipeed团队开发一个轻量级AI
    的头像 发表于 12-12 17:57 977次阅读
    测评分享 | 如何在先楫HPM6750上运行<b class='flag-5'>轻量级</b>AI推理<b class='flag-5'>框架</b>TinyMaix

    轻量级数据库有哪些

    轻量级数据库有哪些 随着互联网和物联网等新一代信息技术的广泛应用,数据库系统也变得越来越重要。人们对于数据数据库的可靠性、安全性和性能等要求也越来越高。与传统的关系型
    的头像 发表于 08-28 16:41 4289次阅读

    超级方便的轻量级Python流水线工具

    Mara-pipelines 是一个轻量级数据转换框架,具有透明和低复杂性的特点。其他特点如下: 基于非常简单的Python代码就能完成流
    的头像 发表于 10-31 11:26 355次阅读
    超级方便的<b class='flag-5'>轻量级</b>Python流水线工具

    基于Python 轻量级ORM框架

    ORM框架使用最广泛的就是SQLAlchemy和Django自带的ORM框架,但是SQLAlchemy的语法显然相对Django的ORM框架麻烦一点。 而Django本身是一个web框架
    的头像 发表于 11-01 11:17 344次阅读
    基于Python <b class='flag-5'>轻量级</b>ORM<b class='flag-5'>框架</b>

    轻量级数据库有哪些类型

    轻量级数据库是指具有小巧、灵活、高效的特点,适用于小规模项目和嵌入式设备的数据库管理系统。下面是对轻量级数据库类型的详细介绍,包括关系型数据库、非关系型
    的头像 发表于 12-20 11:29 598次阅读