Fork me on GitHub
Suzf  Blog

Tag kafka

Hello Kafka

Kafka Setup

Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。

本教程主要介绍Kafka 在Centos 6上的安装和使用,包括功能验证和集群的简单配置。

kafka docs & download
-- http://kafka.apache.org/documentation.html
-- http://kafka.apache.org/downloads.html

1. 安装 zk & java
请参考 [译] zookeeper 入门教程

2. 运行zk
bin/zkServer.sh  start

3. 安装 kafka
下载 kafka 并配置

wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
tar xf kafka_2.11-0.9.0.1.tgz
mv kafka_2.11-0.9.0.1 /usr/loca/kafka

配置 server.properties 文件中的 zookeeper.connect,设置为 2 中的IP 和端口
创建多 broker

@[email protected][11:59:56][[email protected] kafka]#diff -ruN config/server.properties  config/server-1.properties |grep "^[[email protected]]"
--- config/server.properties  2016-02-12 08:37:25.000000000 +0800
+++ config/server-1.properties  2016-04-18 11:54:34.011797645 +0800
@@ -17,14 +17,14 @@
-broker.id=0
+broker.id=1
-#port=9092
+port=9093
@@ -57,7 +57,7 @@
-log.dirs=/tmp/kafka-logs
+log.dirs=/tmp/kafka-logs-1

4. 运行 kafka

bin/kafka-server-start.sh  -daemon config/server.properties
bin/kafka-server-start.sh  -daemon config/server-1.properties

停止

bin/kafka-server-stop.sh
pkill -9 -f config/server.properties

5. 创建Topic
创建一个名为“test”只有一个分区,只有一个副本的Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic lucy
Created topic "lucy".

如果你用的是Kafka 8.x之前版本,请用以下的命令创建topic

bin/kafka-create-topic.sh --zookeeper  localhost:2181 \
--replica 1 --partition 1 --topic lucy

运行list topic命令,可以看到Topic列表

bin/kafka-topics.sh --describe  --zookeeper localhost:2181
Topic:lucy  PartitionCount:1  ReplicationFactor:1 Configs:
Topic: lucy Partition: 0  Leader: 1 Replicas: 1 Isr: 1

6. 发送消息
kafka自带的一个命令行客户端,运行后可以输入消息,kafka会将其发送到kafka进群进行消息消费。默认情况下,每一行数据被作为一个消息进行发送。
接下来我们运行producer试试

^_^[12:02:21][[email protected] kafka]#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic lucy
Hello Kafka
My name is jeffrey
This is test on lab.suzf.net

7. 启动消费者(consumer)
上面我们通过kafka自带的命令行输入了消息,那么我们现在启动消费者看看是否会接收到。

@[email protected][12:03:22][[email protected] kafka]#bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic lucy --from-beginning
Hello Kafka
My name is jeffrey
This is test on lab.suzf.net
^CProcessed a total of 3 messages

可以看到消费者已经对我们上面输入的数据进行处理了.

8.  删除无用的topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Topic test is already marked for deletion.

bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion

并没有真正删除,如果要真正删除,配置 delete.topic.enable=true.

配置文件在kafka/config目录

vim config/server.properties
# Whether topic deletion should be allowed. Requires kafka >= 0.8.2
delete.topic.enable=true

# bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test2
Created topic "test2".

# bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion
test2

# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test2
Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
^_^[13:55:46][[email protected] kafka]#  bin/kafka-topics.sh --list --zookeeper localhost:2181
lucy
test - marked for deletion