使用 Kafka Connect,可以把各种不同来源的数据直接放到 Kafka 里,也可以把 Kafka 里的数据放到各种不同的地方。比如我们可以直接把各种数据仓库里的数据放到 Kafka 里,也可以把 Kafka 里的数据直接放到不同的数据仓库里。Kafka 提供了不同类型的 Connector 可以连接不同的数据来源,也就是你想让 Kafka 连接什么东西,就可以去找一个对应的 Connector。
下面我们可以试一下,用一个文件作为 Kafka 的数据来源,文件里的每一行文字都会放到 Kafka 的一个 Topic 里面。然后我们可以再把这个 Topic 里的东西放到另外一个文件里。
Kafka 自带了一个演示可以参考一下。在 config 目录里面,先打开 connect-standalone.properties 这个文件,一会创建 Kafka Connect 服务的时候会用到这个文件里的配置。
然后打开 connect-file-source.properties,这个文件里配置的是一个文件来源连接器,connector.class 用的就是这个 FileStreamSource ,file 设置的是文件的名字,默认是 test.txt。 文件里的每一行内容都会放到 connect-test 这个 Topic 里面。
再打开 connect-file-sink.properties,这个 Connector 设置的是 Kafka 输出的数据用的一个连接器。用的 connector.class 是这个 FileStreamSink,这个 class 就是别人事先写好的一些代码,它的作用就是把 kafka 里的数据写入到一个叫 demo.sink.txt 的文件里。数据的来源是 conenct-test 这个 topic 。
在 kafka 这个目录的下面,创建一个文件,名字叫 test.txt ,在这个文件里输入一行内容,比如 ninghao.net ,注意文件的结尾要加上一个空行。
回到终端,在 kafka 目录的下面,执行一下 connect-standalone.sh config/connect-standalone.properties 再加上 config/connect-file-source.properties,还需要一个 config/connect-file-sink.properties ,执行一下,成功以后,观察一下 kafka 目录,你会发现这里会出现一个 test.sink.txt。
文件里的内容就是 test.txt 里的东西。 把这个 test.sink.txt 文件放在编辑器的底部。然后回到 test.txt 这个文本文件,在里面输入一行新的内容。
这样这行内容就会放到 kafka 里的 connect-test 这个 Topic 里面,这个 Topic 里的东西又会输出到 test.sink.txt 文件里。 所以在这个文件里你会发现刚才在 test.txt 文件里输入的这行新的内容。
回到终端,新建一个终端标签,这里我们可以开一个新的 Consumer,执行 kafka-console-consumer --bootstrap-server 是 localhost:9092 --topic 是 connect-test 。
再回到这个 test.txt 文件,输入一行新的内容,保存一下文件。
在终端可以观察一下,你会发现输出了一个 JSON 格式的数据,这里的 payload 里的东西就是刚才我们在 test.txt 文件里输入的新的内容。
这行新的内容也会出现在 test.sink.txt 这个文件里。