用户登录

使用 Kafka Connect,可以把各种不同来源的数据直接放到 Kafka 里,也可以把 Kafka 里的数据放到各种不同的地方。比如我们可以直接把各种数据仓库里的数据放到 Kafka 里,也可以把 Kafka 里的数据直接放到不同的数据仓库里。Kafka 提供了不同类型的 Connector 可以连接不同的数据来源,也就是你想让 Kafka 连接什么东西,就可以去找一个对应的 Connector。

下面我们可以试一下,用一个文件作为 Kafka 的数据来源,文件里的每一行文字都会放到 Kafka 的一个 Topic 里面。然后我们可以再把这个 Topic 里的东西放到另外一个文件里。

Kafka 自带了一个演示可以参考一下。在 config 目录里面,先打开 connect-standalone.properties 这个文件,一会创建 Kafka Connect 服务的时候会用到这个文件里的配置。

然后打开 connect-file-source.properties,这个文件里配置的是一个文件来源连接器,connector.class 用的就是这个 FileStreamSource ,file 设置的是文件的名字,默认是 test.txt。 文件里的每一行内容都会放到 connect-test 这个 Topic 里面。

再打开 connect-file-sink.properties,这个 Connector 设置的是 Kafka 输出的数据用的一个连接器。用的 connector.class 是这个 FileStreamSink,这个 class 就是别人事先写好的一些代码,它的作用就是把 kafka 里的数据写入到一个叫 demo.sink.txt 的文件里。数据的来源是 conenct-test 这个 topic 。

在 kafka 这个目录的下面,创建一个文件,名字叫 test.txt ,在这个文件里输入一行内容,比如 ninghao.net ,注意文件的结尾要加上一个空行。

回到终端,在 kafka 目录的下面,执行一下 connect-standalone.sh config/connect-standalone.properties 再加上 config/connect-file-source.properties,还需要一个 config/connect-file-sink.properties ,执行一下,成功以后,观察一下 kafka 目录,你会发现这里会出现一个 test.sink.txt。

文件里的内容就是 test.txt 里的东西。 把这个 test.sink.txt 文件放在编辑器的底部。然后回到 test.txt 这个文本文件,在里面输入一行新的内容。

这样这行内容就会放到 kafka 里的 connect-test 这个 Topic 里面,这个 Topic 里的东西又会输出到 test.sink.txt 文件里。 所以在这个文件里你会发现刚才在 test.txt 文件里输入的这行新的内容。

回到终端,新建一个终端标签,这里我们可以开一个新的 Consumer,执行 kafka-console-consumer --bootstrap-server 是 localhost:9092 --topic 是 connect-test 。

再回到这个 test.txt 文件,输入一行新的内容,保存一下文件。

在终端可以观察一下,你会发现输出了一个 JSON 格式的数据,这里的 payload 里的东西就是刚才我们在 test.txt 文件里输入的新的内容。

这行新的内容也会出现在 test.sink.txt 这个文件里。

Kafka Connect:数据输入与输出《 Kafka:起步 》

统计

14696
分钟
0
你学会了
0%
完成

社会化网络

关于

微信订阅号

扫描微信二维码关注宁皓网,每天进步一点