kafka集群部署
Kafka最初是由领英(LinkedIn)开发,并随后于2011年初开源,并于2012年10月23日由Apache Incubator孵化出站。它是为处理实时数据提供一个统一、高吞吐、低延迟的分布式发布订阅消息系统。Kafka由Java和Scala编写。根据2014年Quora的帖子,Jay Kreps似乎已经将它以作家弗朗茨·卡夫卡命名。Kreps选择将该系统以一个作家命名是因为,它是“一个用于优化写作的系统”,而且他很喜欢卡夫卡的作品。Kafka架构的主要术语包括Topic、Record和Broker。Topic由Record组成,Record持有不同的信息,而Broker则负责复制消息。Kafka有四个主要API:
- 生产者API:支持应用程序发布Record流。
- 消费者API:支持应用程序订阅Topic和处理Record流。
- Stream API:将输入流转换为输出流,并产生结果。
- Connector API:执行可重用的生产者和消费者API,可将Topic链接到现有应用程序。
下面进行Kafka集群部署工作。这里以3台服务器为例,假设IP分别是1.1.1.0, 1.1.1.1, 1.1.1.2,默认在1.1.1.0上演示。这里假设Java和Zookeeper已经部署成功,如果没有部署可以参考我之前的文章。
下载Kafka
从官网下载Kafka ,这里下载2.4.0版本
1 | wget http://mirrors.ocf.berkeley.edu/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz |
安装Kafka
安装Kafka非常简单,直接解压即可
1 | tar -xzf kafka_2.11-2.4.0.tgz |
也可以指定解压目录,如Documents
1 | tar -xzf kafka_2.11-2.4.0.tgz -C ./Documents/. |
配置Kafka
进入Kafka安装目录
1 | cd kafka_2.11-2.4.0/ |
创建日志文件夹,
1 | mkdir log |
修改如下信息
1 | broker.id=0 |
把文件夹kafka_2.11-2.4.0/ 拷贝到另外两台服务器上
1 | scp -r kafka_2.11-2.4.0/ 1.1.1.1:/home/jinzhongxu/. |
并分别修改( broker.id
属性是集群中服务器的永久的固定的名字)
1 | broker.id=1 |
和
1 | broker.id=2 |
启动Kafka
进入Kafka安装目录
1 | ./bin/kafka-server-start.sh config/server.properties |
运行命令
1 | jps |
查看Kafka是否启动成功。并在其他服务器上也分别启动Kafka。
测试Kafka
创建topic
1 | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test |
查看topic
1 | bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
创建producer
1 | bin/kafka-console-producer.sh --broker-list localhost:909-topic test |
创建consumer
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
删除Topic
首先,在config/server.properties中增加
1 | delete.topic.enable=true |
否则,并不能立刻删除该topic,而是标记为待删除(marked for deletion),当重启Kafka时才会删除。
然后,查看该Kafka集群有哪些topic
1 | ./bin/kafka-topics.sh --list --zookeeper localhost:2181 |
最后,使用命令删除topic
1 | ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test |
再次查看Kafka的topic发现已经没有了。