<返回更多

超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

2021-09-09    Python阿执
加入收藏
超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

  • 基于非常简单的Python/ target=_blank class=infotextkey>Python代码就能完成流水线开发。
  • 使用 PostgreSQL 作为数据处理引擎。
  • 在Web界面可视化分析流水线执行过程。
  • 基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。
  • 基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

1.安装

 

由于使用了大量的依赖,Mara-pipelines 并不适用于 windows,如果你需要在 Windows 上使用 Mara-pipelines,请使用 Docker 或者 Windows 下的 linux 子系统。

使用pip安装Mara-pipelines:

pip install mara-pipelines

 

或者:

pip install git+https://github.com/mara/mara-pipelines.git

 

2.使用示例

 

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
from
mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')
pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))
sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')
for host in ['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])
pipeline.add(sub_pipeline, ['ping_localhost'])
pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])

 

可以看到,Task包含了多个commands,这些 command s会用于真正地执行动作。

 

而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

 

pipeline.add(sub_pipeline, ['ping_localhost'])

 

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

import mara_db.auto_migration
import mara_db.config
import mara_db.dbs
mara_db.config.databases
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}
mara_db.auto_migration.auto_discover_models_and_migrate()

 

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (
node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables

 

为了运行这个流水线,你需要:

from mara_pipelines.ui.cli import run_pipeline
run_pipeline(pipeline)

超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

这将运行单个流水线节点及其 (sub_pipeline ) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

 

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

  • 所有子节点的图以及它们之间的依赖关系
  • 流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)
  • 所有流水线节点及其平均运行时间和由此产生的排队优先级的表
  • 流水线最后一次运行的输出和时间线
超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

对于每个任务,都有一个页面显示

  • 流水线中任务的上游和下游
  • 最近 30 天内任务的运行时间
  • 任务的所有命令
  • 任务最后运行的输出
超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点:

超级方便的轻量级Python流水线工具,还有漂亮的可视化界面

 

我们的文章到此就结束啦

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>