<返回更多

流处理引擎:ksql

2020-08-07    
加入收藏

在阅读该文之前,我已经假设你已经对kafka的broker、topic、partition、consumer等概念已经有了一定的了解。

流处理

流数据是一组顺序、大量、快速、连续到达的数据序列,一般情况下,数据流可被视为一个随时间延续而无限增长的动态数据集合。

Confluent KSQL

Confluent KSQL是一个基于kafka的实时数据流处理工具引擎,提供了强大且易用的sql交互方式来对kafka数据流进行处理,而无需编写代码。ksql具备高扩展、高弹性、容错式等优良特性,并且它提供了大范围的流式处理操作,比如数据过滤、转化、聚合、连接join、窗口化和 Sessionization (即捕获单一会话期间的所有的流事件)等。

概念

架构

流处理引擎:ksql

 

stream和table

流(stream)表示的从开始至今的完整的历史,它代表了过去产生的数据(事件、日志等)及其相应的时间。新的数据只能被不断地添加到流中,无法被删除和修改,它们是既定的事实。从某种角度而言,流是对事实的建模。

表(table)表示的是基于数据流进行了某种操作之后的数据,它是对历史数据的某种状态的快照。表的这个概念,是源自于已经发展了数十年的RDBMS,因此,基本可以用相同的理解去使用table。

其实,RDBMS中也有数据流,如binlog本身就是一种流式数据。KSQL将stream作为基础对象,而RDBMS的基础对象是table。KSQL和RDBMS都有将stream和table互相转化的功能,只是二者的侧重点不同而已。

query的生命周期

基本流程和一般DBMS相同。

使用

最简单的体验方式: 使用Docker。这种方式默认下将zookeeper、kafka、ksql在一个compose(一共9个service)下启动。最低配置8G内存,尝试请谨慎。

git clone https://github.com/confluentinc/cp-docker-images
cd cp-docker-images
git checkout 5.2.1-post
cd examples/cp-all-in-one/
docker-compose up -d --build
# 新建topic: user
docker-compose exec broker kafka-topics --create --zookeeper 
zookeeper:2181 --replication-factor 1 --partitions 1 --topic users
# 新建topic: pageview
docker-compose exec broker kafka-topics --create --zookeeper 
zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews

样例里面会自动生成两个topic:pageview和user,表示用户对某个页面的访问日志。

现在我们kafka和ksql都已经有了,还创建了两个topic。现在我们使用一个脚本来往这两个topic写入一些数据(这个脚本写入的数据为avro)

wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config
curl -X POST -H "Content-Type: Application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config
curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors

启动KSQL终端

docker-compose exec ksql-cli ksql http://ksql-server:8088

DDL

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) 
WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,  
userid VARCHAR) 
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');

SHOW STREAMS;
# 设置query语句读取最开始的数据
SET 'auto.offset.reset'='earliest';
SELECT pageid FROM pageviews LIMIT 3;

你会发现这条query会从pageviews流中获取每条记录的pageid。你也可以加上一些where条件尝试一下。

CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, 
regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid 
WHERE gender = 'FEMALE';

这条DDL会对pageviews和users中的数据进行左连接操作,并把连接结果作为新stream pageviews_femails的数据。这个stream的数据会写到一个新的kafka topic:PAGEVIEWS_FEMALE。

即:我们可以完全基于一个现有的topic新建一个stream;也可以基于现有的stream新建一个stream,这建立方法所得到的数据会存储在一个和stream名相同的topic中。

CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', 
value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
CREATE TABLE pageviews_regions AS SELECT gender, regionid , 
COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) 
GROUP BY gender, regionid HAVING COUNT(*) > 1;
# 类似于MySQL的desc
DESCRIBE EXTENDED pageviews_female_like_89;

和外部系统的连接

ksql可以使用 ksql connectors 和外部系统如:mysql、s3、hdfs等进行通信、操作。

优缺点

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>