<返回更多

Kafka实时API探秘

2020-11-30    
加入收藏

本文要点

在构建应用程序和系统时,我们一直面临的一个挑战是如何有效地在它们之间交换信息,同时保持接口修改的灵活性,而不会对其他地方产生不恰当的影响。接口越是具体和简单化,在做出变更时就越有可能需要进行彻底的重写。反过来也是成立的,通用的集成模式可以适用,并得到广泛支持,但这是以性能作为代价。

事件提供了一种“金发姑娘式”的方法,实时 API 可以作为应用程序的基础,既灵活又高性能,既松耦合又高效。

事件可以被视为其他大多数数据结构的构建块。一般来说,它们记录某件事情发生的事实和发生的时间点。一个事件可以捕获不同级别的信息:从一个简单的通知到一个可以描述所发生事情的完整状态的事件。

我们可以通过聚合事件来创建状态。除了作为状态的基础,事件还可以用于在发生事件时异步触发其他地方的动作——这是事件驱动架构的基础。通过这种方式,我们可以构建事件消费者来满足我们的需求——包括无状态的和有状态的。事件生产者可以选择维护状态,但没有必要这样做,因为事件消费者可以从接收到的事件中重新构建状态。

在你所工作的业务领域,你可能会想到很多事件示例。它们可以是人类之间产生的互动,也可以是机器之间产生的交互。它们可能包含一个丰富的有效负载,或者它们本质上只是一个通知。例如:

这些事件可以用来直接触发其他地方的动作(如处理订单的服务),也可以通过聚合来提供信息(如当前停车场已经被占用的数量,就可以知道还有多少空位)。

所以,如果事件是我们构建应用程序和服务的基石,那么我们需要一种技术来支持我们——这就是 Apache Kafka 的切入点。Kafka 是一个可伸缩的事件流平台,它提供了:

Kafka 采用了分布式日志的概念。通过这个简单但功能强大的分布式、不可变、仅追加的日志的概念,我们可以以一种可伸缩和高效的方式捕获和存储业务和系统产生的事件。这些事件可以供多个用户使用,也可以进行进一步的处理和聚合,既可以直接使用,也可以存储在 RDBMS、数据湖和 NoSQL 等存储系统中。

在本文的其余部分中,我将探索 Apache Kafka 提供的 API,并演示如何在应用程序中使用它们。

生产者和消费者 API

像 Kafka 这样的系统,它的伟大之处在于生产者和消费者是解耦的,这意味着我们可以在不需要消费者的情况下生产数据(由于是解耦的,我们可以大规模地这样做)。一个事件发生了,我们把它发送到 kafka,就这么简单。我们所需要知道的就是 Kafka 集群的细节,以及我们想要发送事件到的主题(Kafka 组织数据的一种方式,有点像 RDBMS 中的表)。

Kafka 有很多不同语言的客户端。这里有一个使用 Go 产生事件到 Kafka 的例子:

package mainimport (    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() {    topic := "test_topic"    p, _ := kafka.NewProducer(&kafka.ConfigMap{        "bootstrap.servers": "localhost:9092"})    defer p.Close()    p.Produce(&kafka.Message{        TopicPartition: kafka.TopicPartition{Topic: &topic,            Partition: 0},        Value: []byte("Hello world")}, nil)}

因为 Kafka 是持久地存储事件的,所以当我们想要使用事件时,它们是可用的,直到我们使用完后才过期(这个可以根据主题来配置)。

事件被写入 Kafka 主题后,就可供一个或多个消费者读取。消费者可以采用传统的发布/订阅方式,并在新事件到达时接收它们,也可以根据应用程序的需要选择重新消费之前某个时间点的事件。Kafka 的这种回放功能要归功于它的持久和可伸缩的存储层,这给很多重要的实际应用场景提供了巨大优势,如机器学习和 A/B 测试,这些场景同时需要历史数据和实时数据。在受监管的行业中,数据必须保留多年才符合法律规定。传统的消息传递系统如 RabbitMQ、ActiveMQ 无法支持这样的要求。

package mainimport (    "fmt"    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() {    topic := "test_topic"    cm := kafka.ConfigMap{        "bootstrap.servers":        "localhost:9092",        "go.events.channel.enable": true,        "group.id":                 "rmoff_01"}    c, _ := kafka.NewConsumer(&cm)    defer c.Close()    c.Subscribe(topic, nil)    for {        select {        case ev := <-c.Events():            switch ev.(type) {            case *kafka.Message:                km := ev.(*kafka.Message)                fmt.Printf("✅ Message '%v' received from topic '%v'n", string(km.Value), string(*km.TopicPartition.Topic))            }        }    }}

当一个消费者连接到 Kafka 时,它会提供一个消费者群组标识符。消费者群组支持两种功能。首先,Kafka 用它跟踪消费者读取主题的偏移量,当消费者重新连接时,可以从之前的位置继续读取。第二,消费者应用程序可能希望使用多个消费者读取数据,形成一个消费者群组,从而并行处理数据。Kafka 将事件分配给群组内的每一个消费者,如果随后有成员离开或加入(例如当一个消费者实例发生崩溃时),会主动管理好群组。

这意味着多个服务可以使用相同的数据,而它们之间没有任何相互依赖关系。同样的数据也可以使用 Kafka Connect API 路由到其他数据存储中。

Kafka 提供了 JAVA、C/C++、Go、Python 和 Node.js 等语言的生产者和消费者 API。不过,如果你的应用程序想要使用 HTTP 而不是原生的 Kafka 协议呢?这个时候可以使用 REST 代理。

在 Kafka 中使用 REST API

假设我们正在为智能停车场的设备开发一个应用程序。记录汽车停车位的事件的有效载荷可能看起来像这样:

{    "name": "NCP Sheffield",    "space": "A42",    "occupied": true}

我们可以把这个事件发送到 Kafka 主题上,它也会将记录事件的时间作为事件元数据的一部分。使用Confluent REST Proxy向 Kafka 生成数据只需要进行一个简单的 REST 调用:

curl -X POST      -H "Content-Type: Application/vnd.kafka.json.v2+json"      -H "Accept: application/vnd.kafka.v2+json"      --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}'      "http://localhost:8082/topics/carpark"

任何一个应用程序都可以使用之前介绍的原生消费者 API 或使用 REST API 来消费这个主题。与原生消费者 API 一样,使用 REST API 的消费者也是消费者群组的成员,这个时候叫作订阅。因此,对于 REST API,必须首先声明消费者和订阅:

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json"       --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}'       http://localhost:8082/consumers/rmoff_consumercurl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}'  http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription

完成这些之后,就可以读取事件了:

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"       http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records[    {        "topic": "carpark",        "key": null,        "value": {            "name": "Sheffield NCP",            "space": "A42",            "occupied": true        },        "partition": 0,        "offset": 0    }]

如果有多个事件要接收,可以通过批次获取。如果想要检查新事件,需要再次进行 REST 调用。

我们已经介绍了如何向 Kafka 写入数据和从 Kafka 主题获取数据。但是,很多时候,我们想做的不只是简单的 Pub/Sub。我们想基于事件流看到更大的图景——所有汽车来来去去的情况、现在有多少空车位或者某个特定停车场的更新流。

条件通知、流式处理和物化视图

如果你认为 Kafka 只是一个提供 Pub/Sub 功能的系统,就跟认为 iphone 只是一个用来拨打和接收电话的设备一样。我的意思是,如果把 Pub/Sub 看成 Kafka 提供的众多能力当中的一个,这是没有错的……它的作用确实远远不止于此。Kafka 通过 Kafka Streams API 提供了流式处理能力。这是一个功能丰富的 Java 客户端库,用于在 Kafka 中大规模和跨多台机器对数据进行有状态的流式处理。Kafka Streams 被沃尔玛、Ticketmaster 和 Bloomberg 等公司广泛应用,它还是 ksqlDB 的基础。

ksqlDB是一个专门为流式处理应用程序构建的事件流数据库。它提供了一个基于 SQL 的 API 来查询和处理 Kafka 中的数据。ksqlDB 的特性包括过滤、转换和连接来自流和表的数据,通过聚合事件创建物化视图,等等。

要使用 ksqlDB 中的数据,我们首先需要声明一个 schema:

CREATE STREAM CARPARK_EVENTS (NAME     VARCHAR,                              SPACE    VARCHAR,                              OCCUPIED BOOLEAN)                        WITH (KAFKA_TOPIC='carpark',                              VALUE_FORMAT='JSON');

ksqlDB 是一个集群应用程序,这个初始声明工作可以在启动时完成,也可以根据需要由客户端来完成。完成这些之后,客户端就可以订阅来自原始主题的变更流(带有过滤器)。例如,想要获得一个停车场有空位的通知,可以运行以下命令:

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,       SPACE  FROM CARPARK_EVENTS WHERE NAME='Sheffield NCP'   AND OCCUPIED=false  EMIT CHANGES;

与 SQL 查询不同,这个查询是一种持续查询(使用 EMIT CHANGES 子句指定)。持续查询,即推送(Push)查询,将在事件发生时(现在和将来)持续地返回新的匹配项,直到事件终止为止。ksqlDB 还支拉取(Pull)查询(我们将在下面探讨),这些查询的行为与常规 RDBMS 的查询差不多,返回某个时间点的值。因此,ksqlDB 既支持流也支持静态状态,在实际当中,大多数应用程序需要根据正在执行的操作来选择这两种方式。

ksqlDB 提供了一个全面的 REST API,通过 curl 进行上面的 SQL 调用看起来像这样:

curl --http2 'http://localhost:8088/query-stream'      --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'''yyyy-MM-dd HH:mm:ss''') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='''Sheffield NCP''' and OCCUPIED=false EMIT CHANGES;"}'

这个调用产生一个来自服务器端的响应流(带有头部信息),然后源主题有匹配的事件时,这些事件被发送到客户端:

{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}…["2020-08-05 16:02:33","A42"]………["2020-08-05 16:07:31","D72"]…

我们也可以使用 ksqlDB 来定义和填充新的数据流。在 SELECT 语句前加上 CREATE STREAM streamname AS,就可以将持续查询的输出路由到 Kafka 主题。因此,我们可以使用 ksqlDB 转换、连接、过滤发送给 Kafka 的事件。ksqlDB 支持将表作为一等对象类型,我们可以用它来增强接收到的有关停车场信息的事件:

CREATE STREAM CARPARKS AS    SELECT E.NAME AS NAME, E.SPACE,           R.LOCATION, R.CAPACITY,           E.OCCUPIED,           CASE               WHEN OCCUPIED=TRUE THEN 1               ELSE -1           END AS OCCUPIED_IND    FROM   CARPARK_EVENTS E           INNER JOIN           CARPARK_REFERENCE R           ON E.NAME = R.NAME;

我们仍然使用 CASE 语句来创建可用车位的计数。上面的 CREATE STREAM 填充了一个 Kafka 主题,看起来像这样:

Kafka实时API探秘

 

最后,让我们看看如何在 ksqlDB 中创建有状态聚合并在客户端查询。要创建物化视图,需要运行包含聚合函数的 SQL:

CREATE TABLE CARPARK_SPACES AS    SELECT NAME,           SUM(OCCUPIED_IND) AS OCCUPIED_SPACES        FROM CARPARKS        GROUP BY NAME;

这个状态是在分布式 ksqlDB 节点中维护的,可以使用 REST API 查询:

curl --http2 'http://localhost:8088/query-stream'      --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'

与上面看到的响应流不同,针对状态的查询(称为“拉取查询”,而不是“推送查询”)是立即返回的,然后退出:

{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[30]

如果应用程序希望获得最新的数字,它们可以重新发出查询,结果可能会发生变化,也可能不会:

curl --http2 'http://localhost:8088/query-stream'      --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[29]

ksqlDB 官方提供了一个Java客户端,社区提供了Python和Go客户端。

与其他系统集成

将 Kafka 作为异步消息传递的高可伸缩性和持久性代理的一个好处是在应用程序之间交换的数据也可以用于驱动流式处理(如上所述),或直接送入依赖的系统。

继续以停车场应用程序为例,我们很可能想要在其他地方使用这些停车或离场事件,例如:

你可以使用 Apache Kafka 的 Connect API 来定义 Kafka 内外系统的流式集成。例如,从 Kafka 实时流数据到 S3,你可以这样:

curl -i -X PUT -H "Accept:application/json"     -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config     -d ' {        "connector.class": "io.confluent.connect.s3.S3SinkConnector",        "topics": "carpark",        "s3.bucket.name": "rmoff-carparks",        "s3.region": "us-west-2",        "flush.size": "1024",        "storage.class": "io.confluent.connect.s3.storage.S3Storage",        "format.class": "io.confluent.connect.s3.format.json.JsonFormat"        }'

现在,用于驱动应用程序通知和构建可以直接查询状态的应用程序的数据也流到了 S3。这些应用场景之间都是解耦的。如果我们随后想要将数据流到另一个地方,比如 Snowflake,只需要添加另一个 Kafka 连接配置,其他消费者完全不受影响。Kafka Connect 也可以将数据流到 Kafka。例如,我们可以使用变更数据捕获(CDC)对 ksqlDB 中的 CARPARK_REFERENCE 表进行流化。

结论

Kafka 提供了一个可伸缩的事件流平台,你可以用它来构建强大的基于事件的应用程序。将事件作为连接应用程序和服务的基础,你可以从多方面受益,包括松散耦合、服务自治、弹性、灵活演化和弹性。

你可以使用 Kafka API 及其周边生态系统(包括 ksqlDB)来进行基于订阅的消费和查询物化视图,而不需要额外的数据存储。在 API 方面,既有原生客户端 API,也有 REST API。

要了解更多关于 Kafka 的信息,请访问 developer.confluent.io。Confluent Platform 是 Apache Kafka 的一个发行版,包含了本文讨论的所有组件。它可以在本地使用,也可以作为托管服务使用(Confluent Cloud)。你可以在GitHub上找到本文的代码示例和用于运行示例的 Docker Compose 文件。如果你想了解更多有关如何使用 Kafka 构建事件驱动系统的知识,那么一定要阅读 Ben Stopford 的优秀著作《设计事件驱动系统》。

作者简介

Robin Moffatt 是 Confluent 的高级开发者布道师。Confluent 是由 Apache Kafka 原作者(同时也是 Oracle ACE 董事)创立的。自 2009 年以来,他一直在各种技术大会(包括 QCon、Devoxx、Strata、Kafka 峰会和Øredev)上演讲。读者可以在网上找到他的演讲,订阅他的YouTube频道,阅读他的博文。工作之余,Robin 喜欢跑步,喝啤酒,吃油炸早餐。

原文链接

Real Time APIs in the Context of Apache Kafka

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>