Kafka Setup
Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
本教程主要介绍Kafka 在Centos 6上的安装和使用,包括功能验证和集群的简单配置。
kafka docs & download
-- http://kafka.apache.org/documentation.html
-- http://kafka.apache.org/downloads.html
1. 安装 zk & java
请参考 [译] zookeeper 入门教程
2. 运行zk
bin/zkServer.sh start
3. 安装 kafka
下载 kafka 并配置
wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
tar xf kafka_2.11-0.9.0.1.tgz
mv kafka_2.11-0.9.0.1 /usr/loca/kafka
配置 server.properties 文件中的 zookeeper.connect,设置为 2 中的IP 和端口
创建多 broker
@_@[11:59:56][[email protected] kafka]#diff -ruN config/server.properties config/server-1.properties |grep "^[-+@]"
--- config/server.properties 2016-02-12 08:37:25.000000000 +0800
+++ config/server-1.properties 2016-04-18 11:54:34.011797645 +0800
@@ -17,14 +17,14 @@
-broker.id=0
+broker.id=1
-#port=9092
+port=9093
@@ -57,7 +57,7 @@
-log.dirs=/tmp/kafka-logs
+log.dirs=/tmp/kafka-logs-1
4. 运行 kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server-1.properties
停止
bin/kafka-server-stop.sh
pkill -9 -f config/server.properties
5. 创建Topic
创建一个名为“test”只有一个分区,只有一个副本的Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic lucy
Created topic "lucy".
如果你用的是Kafka 8.x之前版本,请用以下的命令创建topic
bin/kafka-create-topic.sh --zookeeper localhost:2181 \
--replica 1 --partition 1 --topic lucy
运行list topic命令,可以看到Topic列表
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:lucy PartitionCount:1 ReplicationFactor:1 Configs:
Topic: lucy Partition: 0 Leader: 1 Replicas: 1 Isr: 1
6. 发送消息
kafka自带的一个命令行客户端,运行后可以输入消息,kafka会将其发送到kafka进群进行消息消费。默认情况下,每一行数据被作为一个消息进行发送。
接下来我们运行producer试试
^_^[12:02:21][[email protected] kafka]#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic lucy
Hello Kafka
My name is jeffrey
This is test on lab.suzf.net
7. 启动消费者(consumer)
上面我们通过kafka自带的命令行输入了消息,那么我们现在启动消费者看看是否会接收到。
@_@[12:03:22][[email protected] kafka]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic lucy --from-beginning
Hello Kafka
My name is jeffrey
This is test on lab.suzf.net
^CProcessed a total of 3 messages
可以看到消费者已经对我们上面输入的数据进行处理了.
8. 删除无用的topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Topic test is already marked for deletion.
bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion
并没有真正删除,如果要真正删除,配置 delete.topic.enable=true.
配置文件在kafka/config目录
vim config/server.properties
# Whether topic deletion should be allowed. Requires kafka >= 0.8.2
delete.topic.enable=true
# bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test2
Created topic "test2".
# bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion
test2
# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test2
Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
^_^[13:55:46][[email protected] kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion