Kafka Learning
Kafka概览
Kafaka是分布式消息队列处理架构,它的每一个broker存放在集群中的不同节点上。
一个主题由各个分区组成,各个分区分布式地存储在不同地broker上。
主题1:分区1_1(leader),分区1_2(follower),分区1_3(follower)
主题2:分区2_1(follower),分区2_2(leader),分区2_3(follower)
一个分区的所有副本统称为AR(Assigned Replicas),AR包括leader和所有的follower。能与leader副本保持同步的所有副本(包括leader本身)称为ISR(In-Sync Replicas);不能与leader副本保持同步的所有副本称为OSR(Out-of-Sync Replicas)。综上,有\[\text{AR}=\text{ISR}+\text{OSR}\]
ZooKeeper安装与配置
安装Zookeeper
安装Zookeeper-3.5.7,安装这个比较低的版本是为了于Kafka-Eagle可能的兼容。更高版本的ZooKeeper可能无法被Kafak-Eagle完全访问。
配置ZooKeeper
添加环境变量
在/etc/profile.d/my_env.sh
中添加下述环境变量。
修改zoo.cfg
内容修改如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/zookeeper/data
dataLogDir=/usr/zookeeper/log
clientPort=2181
server.102=192.168.0.45:2888:3888
server.103=192.168.11.210:2888:3888
server.104=192.168.0.5:2888:3888
分发该配置:/root/bin/xsync $ZOOKEEPER_HOME
102
,103
等代表服务器编号。2888
表示服务器与集群中得leader服务器交换信息的端口。3888
表示选举时服务器相互通信的端口。
执行下述代码,102代表服务器编号。对于第二台服务器编号要设置得不同,比如在hadoop103上103为编号,再hadoop104上设置104为编号。
启动、终止ZooKeeper服务
在上述ZooKeeper集群配置好了以后就可以执行下述命令在每一台服务器上必须都启动zookeeper,然后执行zkServer.sh status
可以查看zookeeper各个服务器得zookeeper的状态。
下面是启动/停止的ZooKeeper的shell脚本:
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo "====== 启动zookeeper集群 ======"
echo "------ zookeeper on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh start" &
echo "------ zookeeper on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh start" &
echo "------ zookeeper on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh start" &
wait # wait for all processes to complete
;;
"stop")
echo "====== 关闭zookeeper集群 ======"
echo "------ zookeeper on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh stop" &
echo "------ zookeeper on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh stop" &
echo "------ zookeeper on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $ZOOKEEPER_HOME/bin/zkServer.sh stop" &
wait # wait for all processes to complete
;;
*)
echo "Input Args Error..."
;;
esac
ZooKeeper客户端
登录zookeeper客户端
列出zookeeper管理的所有子节点:ls /
查看正常工作的所有kafka节点的编号:ls /kafka/brokers/ids
查看mytopic主题0号分区的leader和ISR:ls /kafka/brokers/topics/mytopic/partitions/0/state
(在实操过程中这条命令显示了[],很奇怪,KIMI说是kafka版本过高的问题)
查看辅助leader选举的controller位于哪个节点:ls /kafka/controller
(在实操过程中这条命令显示了[],异常)
Kafka配置
配置环境变量
修改配置文件
修改$KAFKA_HOME/config/server.properties
下述几个对应的参数:
# 每个服务器上的broker.id要设置得不同
broker.id=2
listeners=PLAINTEXT://192.168.0.45:9092
advertised.listeners=PLAINTEXT://113.45.129.200:9092
log.dirs=/usr/kafaka-logs
zookeeper.connect=192.168.0.45:2181,192.168.11.210:2181,192.168.0.5:2181/kafka
分发配置。
listeners
设置为当前节点的私网IP;advertised.listeners
设置为当前节点的公网IP。PLAINTEXT
是一种protocol协议类型。
另外,由于我的每个节点的内存比较小,我需要把Kafka的Java运行堆大小调小,否则kafka进程启动后会逐渐递增消耗节点运行内存直至耗尽。
修改为如下内容:
启动Kafka
cd $KAFKA_HOME
bin/kafka-server-start.sh config/server.properties
# 下属为后台启动kafka的命令
bin/kafka-server-start.sh -daemon config/server.properties
若上述命令报错,可su root
进入root用户然后执行该命令。
Kafka集群启动脚本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo "====== 启动kafka集群 ======"
echo "------ kafka on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" &
echo "------ kafka on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" &
echo "------ kafka on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" &
wait # wait for all processes to complete
;;
"stop")
echo "====== 关闭kafka集群 ======"
echo "------ kafka on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
echo "------ kafka on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
echo "------ kafka on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
wait # wait for all processes to complete
;;
*)
echo "Input Args Error..."
;;
esac
安全模式Kafka集群启动脚本
Kerberos安全模式参见:Kerberos安全模式
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo "====== 启动kafka集群 ======"
echo "------ kafka on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start-sasl.sh -daemon $KAFKA_HOME/config/server-sasl.properties" &
echo "------ kafka on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start-sasl.sh -daemon $KAFKA_HOME/config/server-sasl.properties" &
echo "------ kafka on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-start-sasl.sh -daemon $KAFKA_HOME/config/server-sasl.properties" &
wait # wait for all processes to complete
;;
"stop")
echo "====== 关闭kafka集群 ======"
echo "------ kafka on hadoop102"
ssh hadoop102 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
echo "------ kafka on hadoop103"
ssh hadoop103 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
echo "------ kafka on hadoop104"
ssh hadoop104 "source /etc/profile.d/my_env.sh; $KAFKA_HOME/bin/kafka-server-stop.sh" &
wait # wait for all processes to complete
;;
*)
echo "Input Args Error..."
;;
esac
创建Kafka主题
使用下述命令设置Kafka主题、分区、副本因子。
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --replication-factor 3 --partitions 4 --topic topicDB
其中mytopic为主题名称。
使用下述命令查看Kafka主题、分区、副本因子。
bin/kafka-topics.sh --describe --topic topicDB --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092
删除kafka主题:
测试Kafka消息的生产和消费
首先在一个客户端窗口中运行下述kafka消费者命令
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --topic mytopic --bootstrap-server 192.168.0.45:9092,192.168.11.210:9092,192.168.0.5:9092
之后在另一个客户端窗口中运行下述kafka生产者命令
cd $KAFKA_HOME
bin/kafka-console-producer.sh --broker-list 192.168.0.45:9092,192.168.11.210:9092,192.168.0.5:9092 --topic mytopic
在启动了kafka生产者的窗口中输入文本,kafka消费者窗口能够顺利显示。
生产者
生产者数据流
- InFlightRequests:保存对象的具体形式为:
Map<NodeId, Deque<Request>>
,它的主要作用是缓存已经发送出去但还没有收到响应的请求,默认缓存的请求数最多为max.in.flight.requests.per.connection=5
。
Kafka生产者客户端是线程安全的,可以在多线程环境中被复用。而Kafka消费者客户端不是线程安全的。
拦截器(Interceptor)
序列化器(Serializer)
分区器(Partitioner)
Partitioner把数据积累到RecordAccumulator中。RecordAccumulator与broker上的一个主题对应,它可以被视作是一个“暂存的主题”,它也有与主题对应的分区,它的每个分区是双向队列。RecordAccumulator的总大小是\(32\text{MB}\)。
在RecordAccumulator的分区中,数据是按照ProducerBatch(默认大小\(16\text{KB}\))的形式存储的。
sender线程
sender线程从RecordAccumulator读取消息发送到Kafka集群中。
batch.size:当数据积累到batch.size后,sender线程才会发送数据。默认大小\(16\text{KB}\)。
linger.ms:sender线程等待linger.ms时间就会发送数据,不需要等数据积累到batch.size。单位\(\text{ms}\),默认\(0\text{ms}\)。
sender线程通过NetworkClient最多向Kafka集群中的每个节点缓存5个请求,用以处理Kafka集群没有及时应答的情况。
sender线程使用Selector实现向集群发送数据和接收集群应答。如果应答成功,则清除NetworkClient中的请求,同时清理RecordAccumulator中的缓存批次数据;如果应答失败,则重试\(2^{32}-1\)次发送。
应答为\(0\):生产者发送过来的数据,不需要等待数据落盘应答。
应答为\(1\):生产者发送过来的数据,Leader收到数据后应答。
应答为\(-1\):生产者发送过来的数据,Leader和ISR队列里面的所有节点都收齐数据后应答。
生产者代码模板
KafkaProducerAnalysis
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerAnalysis {
public static final String brokerList = "113.45.129.200:9092,113.45.129.21:9092,110.41.159.123:9092";
public static final String topic = "mytopic";
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "my-kafka-producer-client");
// 关联自定义分区规则
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.zt.test1.MyPartitioner");
// 设置缓冲区大小(默认为32MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 设置批次大小(默认为16KB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 设置linger.ms(默设置为1ms)
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 设置压缩类型(默认为none,可以配置为gzip, snappy, lz4, zstd)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, Kafka!");
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("Send message to topic %s, partition %d with offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
异步消息发送producer.send()
异步消息发送不需要等待服务器响应。producer.send()
通常替换为下述使用Callback回调函数的格式。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("Send message to topic %s, partition %d with offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
}
});
异步发送
在Kafka的消息发送机制中,main线程将消息发送到RecordAccumulator中,而不管后续的sender线程处理发送消息的过程。
只要不调用.get()
方法,消息发送机制就是异步的,不过通常还需要使用CallBack()回调函数以显示每条消息发送的具体情况。
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("Topic: " + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset());
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
同步发送
或
调用.get()方法后会阻塞当前Java程序进程,然后使被暂存区RecordAccumulator中的record消息被发送到kafka集群中。producer.close()
也有这个作用。
同步发送的效率较低,因为它每次确保一条消息的发送有返回结果(返回metadata或报错)后才能继续发送下一条消息。
分区器
ProducerRecord(String topic, V value)
:默认采用黏性分区,随机选择一个分区并尽可能一直使用该分区。ProducerRecord(String topic, K key, V value)
:对key进行哈希取余,然后选择分区。ProducerRecord(String topic, Integer partition, K key, V value)
:例如partition=0,则数据写入分区0。
自定义分区规则
根据要发送的消息的文本内容指定发送分区的规则。新建KafkaPartitioner.java
如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class KafkaPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 形参cluster表示集群的原数据信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String msgValues = value.toString();
int partition;
// 若value里包含TKL则发往0号分区,若包含zt则发往1号分区,其他情况发往2号分区
if (msgValues.contains("TKL")) {
partition = 0;
} else if (msgValues.contains("zt")) {
partition = 1;
} else {
partition = 2;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
之后在KafkaProducerAnalysis
类中的Properties initConfig()
中新增
拦截器
onSend()
:拦截器的onSend()
方法可以对消息进行定制化操作,为ProducerRecord增加内容或者过滤一些内容。onAcknowledgement()
:在消息被应答或消息发送失败时,优先于CallBack被执行。比如可以用来统计消息发送的成功失败率。
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.example.kafkaSerializer.UserQuery;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaProducerInterceptor implements ProducerInterceptor<String, UserQuery> {
private final AtomicLong sendSuccess = new AtomicLong(0);;
private final AtomicLong sendFailure = new AtomicLong(0);;
@Override
public ProducerRecord<String, UserQuery> onSend(ProducerRecord<String, UserQuery> record) {
System.out.println(record.value());
System.out.println(record.value().getUserName() + record.value().getUserQuery());
return new ProducerRecord<>(record.topic(), record.partition(),
record.timestamp(), record.key(), record.value(), record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess.incrementAndGet();
} else {
sendFailure.incrementAndGet();
}
}
@Override
public void close() {
System.out.println("[INFO] 发送成功数:" + sendSuccess.get() + ";发送失败数:" + sendFailure.get());
}
@Override
public void configure(Map<String, ?> map) {}
}
之后在properties中新增下述内容:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.example.kafkaInterceptor.KafkaProducerInterceptor");
如果有多个拦截器形成的拦截器链,那么可以按照下述格式对properties进行新增。
生产者重要参数
应答可靠性acks
应答acks为0,生产者发送消息后,不需要等待服务端响应。消息生产效率最高,但可靠性最低,一般很少用。
应答acks为1(默认值),生产者发送消息后,leader副本成功写入消息后返回成功响应。缺点1:若发送消息时,leader副本崩溃且新的leader副本未被选举出来,则消息发送失败。缺点2:若发送消息时,leader副本正常,此时返回消息发送成功的响应,但leader副本随即崩溃且消息还未被其他副本同步,则此时实际上是消息发送失败的。因此一般用于传输日志数据,允许个别数据的丢失。
应答acks为-1或all,生产者发送消息后,等待ISR中所有副本都成功写入消息后,返回消息发送成功的响应,但是要注意ISR中不能只有leader副本。该方法可靠性最高,一般用于传输跟钱相关的数据。
之后在KafkaProducerAnalysis
类中的Properties initConfig()
中新增
消息发送重试retries
retries:配置生产者发送消息的重试次数,默认为0次。有时一些暂时性地异常如“网络波动”、“leader副本选举”可能导致消息发送失败,设置重试次数有助于提升生产者的稳定性。
retry.backoff.ms:默认值为100毫秒,用来设置两次重试之间的时间间隔。
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 最多重试3次
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 重试间隔1秒
对于要保证消息的顺序性要求,retries>0和max.in.flight.requests.per.connection>1可能会造成消息顺序的错位。例如,生产者发送了“消息1”和“消息2”,这两个消息都存在于InFlightRequests等待服务端响应,然后“消息1”发送失败重试发送,“消息2”发送成功,这会导致“消息2”比“消息1”先到达Broker。因此在对顺序性有要求的场景下应该将max.in.flight.requests.per.connection设置为1。
消息压缩compression.type
- compression.type:消息压缩以减少网络传输量、降低网络I/O,以时间换空间。可选值为:gzip, snappy, lz4,默认为不压缩消息。
数据重复
幂等性
数据重复的情况发生概率很低。比如在ISR所有队列都收集了数据并且落盘后,在向sender线程发送acks响应时出现了leader broker挂掉的情况,retry机制会让消息再次发送一次,于是又会有重复的数据在ISR队列收集并且落盘。
数据落盘至少一次
:1. \(\text{acks}=1\);2. \(\text{ISR}\geq2\)。
数据落盘最多一次
:1. \(\text{acks}=0\);
数据落盘至少一次
可能造成数据重复。
(单会话内)精确一次
:幂等性
+
数据落盘至少一次
其中的幂等性
为(PID, Partition,
SeqNumber)不相同,则满足幂等性不重复的要求。PID为kafka会话的进程号(也有说是Producer
ID),Partition为分区号,SeqNumber是所发送消息的序列编号(递增的)。
因为新建kafka会话会有新的PID,因此使用幂等性只能保证(单会话内)的精确一次。
开启幂等性:enable.idempotence=true
(默认);关闭幂等性:enable.idempotence=false
。
生产者事务
事务提供了一种机制保持消息的一致性。能将所有消息都持久化到磁盘,同时确保消息的精准一次性。(具体原理我后续再从书中深研)
开启事务之前必须开启幂等性。kafka有专门的事务主题(默认50个分区)和事务协调器(Transaction
Coordinator),用于将事务持久化到磁盘上。生产者在使用事务功能之前一定要指定一个唯一的transactional.id
生产者向事务协调器请求PID,之后发送事务数据到事务主题中,最后持久化到磁盘当中。
...
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id-01");
...
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(record);
// 如果代码在producer.commitTransaction之前发生了错误,那么消息是不会被送到broker上的。
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
} finally {
producer.close();
}
数据乱序
乱序出现的原因:sender线程向broker发送请求时可以至多缓存5个请求:5 4 3 2 1。如果请求3失败一次需要重试,那么最终请求发送成功地顺序为:3 5 4 2 1。
解决方案:在开启幂等性时,将max.in.flight.requests.per.connection
设置得小于等于5;在未开启幂等性时将max.in.flight.requests.per.connection
设置为1。如果检查到了
至于为什么要将max.in.flight.requests.per.connection设置得小于等于5,以及这个参数是什么意思,后续再通过看书了解。
Broker
分区leader选举规则
kafka主题的分区的leader由Broker上的Controller进行选举。但是所有Broker上的Controller中只有一个能够在zookeeper上注册成功,并且之后的leader选举都由它来完成。
leader的选举规则:按照broker ID 在AR中的顺序进行选举,且要保证被选举的新leader要在ISR中。
节点的服役和退役
###节点的服役 直接按照zookeeper和kafka的安装配置流程再来一遍就可以了。新服役的节点可以在原来kafka集群正在运行的基础之上直接服役。
但是旧有主题的分区和副本因子并不会直接出现在新服役的broker上,为解决此问题需要使用负载均衡。
创建一个topics-to-move.json
文件,指明要负载均衡的主题:
形成kafka负载均衡计划的命令:
cd $KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "102,103,104,105" --generate
将上述命令所生成的计划复制粘贴到一个新的increase-replication-factor.json
文件里面,之后执行下述命令来执行计划:
cd $KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划是否完成:
cd KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
节点的退役
如果直接退役一台broker,会有数据丢失的风险。因此在退役一个broker之前需要把数据导入到其他现役broker上。
cd $KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "102,103,104" --generate
将上述命令所生成的计划复制粘贴到一个新的increase-replication-factor.json
文件里面,之后执行下述命令来执行计划:
cd $KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划是否完成:
cd KAFKA_HOME
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
之后可以在要退役的broker节点上执行bin/kafka-server-stop.sh
停掉kafka。
消费者
基本原理
不同消费者组之间相互独立;属同一个消费者组的消费者各分配不同的分区,且必须分配完所有的分区。
客户端模板代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumerAnalysis {
// public static final String brokerList = "192.168.0.45:9092,192.168.11.210:9092,192.168.0.5:9092";
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "consumer-group-0 ";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-client-0");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题以及对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition(topic, 0));
consumer.assign(topicPartitions);
// 仅订阅某个主题(分区自动分配)
// consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
分区分配策略和再平衡
Kafka有四种分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky,默认为:Range+CooperativeSticky。通过配置参数partition.assignment.strategy
来实现。
每个消费者在初始化时都要向Broker中的leader corrdinator发送JoinGroup请求。
leader corrdinator会在所有同属一个消费者组的消费者中选取一个leader consumer。
leader consumer根据分区分配策略制定消费方案。
leader consumer把消费方案发送给leader corrdinator。
leader consumer把消费方案下发给各个consumer。
每个消费者会和leader
corrdinator保持心跳(3秒),一旦消费者的响应时间超过session.timeout.ms=45s
或消费者处理消息的时间长于max.poll.interval.ms=300s
,该消费者会被移除,再平衡触发
Range策略
通过\(\text{partitions数量}/\text{consumer数量}\)来决定每个消费者应该消费多少个分区,若除不尽则排在前面的消费者多消费几个分区。
缺点:若一个消费者组订阅了很多主题,那么除不尽的分区都会被放在前面的消费者上,导致前面的消费者负载过大(数据倾斜)。
执行下述命令可以将已存在的主题的分区数增加为\(7\)(分区数只能被增大,不能被减小)
cd $KAFKA_HOME
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic mytopic --partitions 7
测试:开启三个consumer客户端,然后启动生产者生产数据。之后关闭掉一个consumer客户端,再启动生产者生产数据,\(45\)秒之后会触发再平衡,由由一个存活的消费者消费所有已经挂掉的消费者的数据。
offset(消费位移)
该部分的offset指的是消费者消费到的位置——消费位移,区别于Kafka主题分区架构中的offset。
查看offset值
offset(消费位移)被持久化在系统主题**__consumer_offsets**中,按照键值对[group.id, topic, 分区号]: offset metadata
的形式存储(group.id为消费者组id)。
消费位移是当前所消费消息的下一位,如下图所示。position是本地处理位移,保存在消费者客户端的本地缓存中。消费位移的提交指position在Kafka服务器上的持久化。每次consumer.pull()
方法调用时,它会根据本地处理位移position来从Kafka服务器上对应的分区位移处拉取消息。
查看offset值:
在config/consumer.properties中修改
exclude.internal.topics=true
以开启消费系统主题的权限。(记得分发)创建一个新的topic:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic mytopic2 --partitions 2 --replication-factor 2
- 生产者生产数据
- 消费者消费数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic mytopic2 --group consumer_group0
- 消费系统主题以查看offset值的变化
自动提交offset
自动提交offset值到系统主题默认开启enable.auto.commit=true
,设置提交时间间隔auto.commit.interval.ms=5s
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
采用自动提交消费位移的方法可能会导致“重复消费”或“消息丢失”。例如在处理所拉取的一批消息时消费者客户端奔溃,因为没有完成本批次的提交(即本地处理位移position还没有被提交到Kafka服务器),那么消费者客户端重启会从Kafka服务器的旧的消费位移开始消费,导致会拉取到同一批数据,导致“重复消费”。
同步提交offset(消费位移)
简单同步提交
处理完一批所拉取的消息后,阻塞当前进程,直至提交消费位移完毕。可能会出现重复消费的情况。
批量处理、批量提交
在简单同步提交的基础上,可以对所拉取的消息不断积累直至能形成一个足够大的批量batchSize。然后每处理完一个批量再同步提交消费位移。可能会出现重复消费的情况。
final int batchSize=2;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= batchSize) {
// do processing
consumer.commitSync();
buffer.clear();
}
}
最细粒度同步提交
在一次pull()
所拉取的消息中,每处理完一条消息就提交一次。这里需要使用到带参数的commitSync()
方法。不会出现重复消费的情况,但是非常消耗性能。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do processing
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
按分区粒度同步提交消费位移
在一次poll()
所拉取的消息中,按照消息的所属分区来区分这些消息,然后每处理完一个分区的消息就进行一次同步消费位移提交。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// do processing
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}
异步提交offset(消费位移)
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do processing
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets {}", offsets, e);
}
}
});
}
订阅主题与分区
- 按照集合的方式定于主题:
- 按照正则表达式订阅主题:好处是按照正则规则可以订阅到后期新增的主题。
- 订阅某主题下标号为0的分区:
- 订阅某主题下的所有分区(partitionFor()方法返回订阅主题的所有分区信息):
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionFor(this.topic);
if (partitionInfos != null) {
for (PartitionInfo tpInfo : partitionInfos) {
partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
}
}
consumer.assign(partitions);
- 取消订阅
ConsumerRecord<K, V>
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
// ...
}
Kafka-Eagle安装
下载Kafka-Eagle
解压Kafka-Eagle(有两层压缩包,需要解压两次),
添加环境变量:
配置MySQL
先在hadoop102上安装MySQL。参见:SQLLearning
- 创建Kafka-Eagle数据库ke,以及对于该数据库的用户ke_user
CREATE USER 'ke_user'@'%' IDENTIFIED BY '000000';
GRANT ALL PRIVILEGES ON ke.* TO 'ke_user'@'%';
FLUSH PRIVILEGES;
- 初始化Kafka-Eagle所需的表
在$KE_HOME/db
中新建ke.sql
文件并写入下述内容:
-- 创建表结构
CREATE TABLE alarm_config (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
group_name VARCHAR(64) NOT NULL,
alarm_name VARCHAR(128) NOT NULL,
alarm_status INT NOT NULL DEFAULT 1,
alarm_type VARCHAR(32) NOT NULL,
alarm_level VARCHAR(32) NOT NULL,
alarm_condition VARCHAR(512),
alarm_contact_group VARCHAR(512),
alarm_create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alarm_modify_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE kafka_offsets (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
cluster_name VARCHAR(64) NOT NULL,
group_name VARCHAR(128) NOT NULL,
topic_name VARCHAR(128) NOT NULL,
partition_id INT NOT NULL,
log_size BIGINT NOT NULL,
offsets BIGINT NOT NULL,
`lag` BIGINT NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE user (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL,
password VARCHAR(64) NOT NULL,
email VARCHAR(128),
role VARCHAR(32) NOT NULL DEFAULT 'admin',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
使用ke_user登录MySQL数据库并且使用上述ke.sql
文件建表。
配置Kafka-Eagle
修改如下内容:
建立于zookeeper的连接。
Kerberos安全模式,配置keytab和principal。
建立Kafka-Eagle于MySQL数据库的连接以存储元数据。
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
cluster1.efak.sasl.enable=true
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=GASSAPI
cluster1.efak.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafkauserService.keytab" principal="kafka_user@EXAMPLE.COM";
#cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=ke_user
efak.password=123456
使用Kafka-Eagle
# 启动Kafka-Eagle
$KE_HOME/bin/ke.sh start
# 停止Kafka-Eagle
$KE_HOME/bin/ke.sh stop
# 查看Kafka-Eagle状态
$KE_HOME/bin/ke.sh status
# 重启Kafka-Eagle
$KE_HOME/bin/ke.sh restart
在本地Windows电脑通过在PowerShell输入下述命令建立与云服务器的连接,然后在浏览器访问localhost:8048即可打开Efka界面。
或,直接在电脑浏览器上访问http://公网IP:8048
console提供的用户名和密码登录也可以直接访问Kafka-Eagle。
目前在Kerberos安全模式的Kafak使用Kafka-Eagle来监测会出现以下小问题。例如:无法显示zookeeper的数目、无法在web UI上直接创建主题和查看主题。但是在非Kerberos安全模式下Kafka-Eagle是能完全正常运行的。我感觉这里的主要问题和无法通过在本地电脑的HDFS Web UI操作都属于一个问题,应该是本地电脑无法完成Kerberos认证的原因。
调优
Kafka硬件选择
场景:100万日活用户,每个用户100条日志数据,共1亿条日数据量。系统平均每秒钟需要处理1150条数据。一条日志的大小在0.5KB-2KB(取1KB)之间。因此,系统需要以1MB/s的平均速度处理日志数据,设最高峰值为20MB/s
- 节点数选择:
经验公式:\[\text{服务器台数} = 2\times (\text{生产者峰值MB/s} \times \text{副本数} / 100) +1\]
- 磁盘选择:
Kafka为顺序读写,优先选择机械硬盘。(固态硬盘适合随机读写)。使用命令lsblk -d -o name,rota
如果显示为0则是固态硬盘,如果显示为1则是机械硬盘。
\[\text{1亿条}\times 1\text{KB} \times \text{2个副本} \times \text{3天} / 0.7 \approx \text{1T}\]
- 内存选择:
\[\text{Kafka内存}=\text{堆内存(Kafka内部设置)} + \text{页缓存}\]
查看Kafka进程使用的堆大小jmap -heap 6437
- CPU 选择
num.replica/fetchers=1
,副本拉取线程数,这个参数占总核心数的50%的1/3
num.network.threads=3
,数据传输线程数,这个参数占总核心数的50%的2/3
建议32个CPU核心。如果有了32个核心的CPU,建议设置num.io.threads=12
,num.replica/fetchers=4
,num.network.threads=8
- 网络 \[100\text{Mbps}/8=12.5\text{MB/s}\]
建议1000兆带宽。
生产者调优
(后面作图详细指明各个参数发挥的作用)
压力测试
Kerberos安全模式
创建Kerberos的Kafka服务主体和用户主体
- 创建Kafka Linux用户主体:
# 在Kerberos服务端执行下面这一行命令创建Kafka用户主体并设置好密码
kadmin.local -q "add_principal kafka_user"
# 另外再为kafka_user创建一个keytab(暂且创建在hadoop102上)
kadmin.local -q "ktadd -kt /etc/security/keytabs/kafkauserService.keytab kafka_user@EXAMPLE.COM"
chown kafka_user:hadoop /etc/security/keytabs/kafkauserService.keytab
cp /etc/security/keytabs/kafkauserService.keytab $KAFKA_HOME/config/kerberos
# 在每个节点上执行下面一行命令,并且设置不同节点的kafka_user用户两两之间ssh免密
useradd -g hadoop -m kafka_user
- 创建Kafka服务主体:
kadmin.local -q "add_principal -randkey kafka/hadoop102@EXAMPLE.COM"
kadmin.local -q "add_principal -randkey kafka/hadoop103@EXAMPLE.COM"
kadmin.local -q "add_principal -randkey kafka/hadoop104@EXAMPLE.COM"
ssh hadoop102
kadmin -p zt/admin@EXAMPLE.COM -w123456 -q "ktadd -kt /etc/security/keytabs/kafkaService.keytab kafka/hadoop102@EXAMPLE.COM"
chown kafka_user:hadoop /etc/security/keytabs/kafkaService.keytab
ssh hadoop103
kadmin -p zt/admin@EXAMPLE.COM -w123456 -q "ktadd -kt /etc/security/keytabs/kafkaService.keytab kafka/hadoop103@EXAMPLE.COM"
chown kafka_user:hadoop /etc/security/keytabs/kafkaService.keytab
ssh hadoop104
kadmin -p zt/admin@EXAMPLE.COM -w123456 -q "ktadd -kt /etc/security/keytabs/kafkaService.keytab kafka/hadoop104@EXAMPLE.COM"
chown kafka_user:hadoop /etc/security/keytabs/kafkaService.keytab
修改Kafka配置
- 相关配置文件准备
在每个节点上执行下述内容:
mkdir $KAFKA_HOME/config/kerberos
cp /etc/security/keytabs/kafkaService.keytab $KAFKA_HOME/config/kerberos
cp /etc/krb5.conf $KAFKA_HOME/config/kerberos
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-sasl.properties
- 配置
server-sasl.properties
在每个节点上执行下述类似的操作:
修改下述内容:
# 0.0.0.0:19092意味着kafka broker在本地网卡(127.0.0.1)、私网网卡(私网IP)、公网网卡(公网IP)共三张网卡的端口19092上暴露kafka服务。
listeners=SASL_PLAINTEXT://0.0.0.0:19092
# hostname必须配置,因为kerberos模式的kafka需要hostname来找到principal。
advertised.listeners=SASL_PLAINTEXT://hadoop102:19092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
如果上述文件中的hadoop102被配置成了公网IP会导致service.name的kafka无法匹配到principal:kafka/hadoop102@EXAMPLE.COM。
- 配置
kafka-server-jaas.conf
文件
在每个节点上执行下述类似操作:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/Kafka380/kafka_2.12-3.8.0/config/kerberos/kafkaService.keytab"
storeKey=true
principal="kafka/hadoop102@EXAMPLE.COM";
};
- 配置
kafka-server-start-sasl.sh
文件
cp $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/bin/kafka-server-start-sasl.sh
vim $KAFKA_HOME/bin/kafka-server-start-sasl.sh
在文件的倒数第二行增加下述内容:
export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/opt/Kafka380/kafka_2.12-3.8.0/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/Kafka380/kafka_2.12-3.8.0/config/kerberos/kafka-server-jaas.conf"
export KAFKA_OPTS="$KAFKA_OPTS -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.rmi.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
分发上面这个文件。
如果不配置第二个export,会导致Kafka-Eagle无法通过JMX端口发现kafak集群。
- 配置
client.properties
文件
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
分发上面这个文件。
- 配置
kafka-client-jaas.conf
在hadoop102(我只在hadoop102上对主题进行创建和消费等操作,故hadoop102为我的kafka客户端)上执行下述操作:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/Kafka380/kafka_2.12-3.8.0/config/kerberos/kafkauserService.keytab"
storeKey=true
useTicketCache=false
principal="kafka_user@EXAMPLE.COM";
};
- 配置
kafka-topics-sasl.sh
,kafka-console-producer-sasl.sh
,kafka-console-consumer-sasl.sh
,kafka-consumer-groups-sasl.sh
文件
在hadoop102上执行下述内容。
cp $KAFKA_HOME/bin/kafka-topics.sh $KAFKA_HOME/bin/kafka-topics-sasl.sh
cp $KAFKA_HOME/bin/kafka-console-producer.sh $KAFKA_HOME/bin/kafka-console-producer-sasl.sh
cp $KAFKA_HOME/bin/kafka-console-consumer.sh $KAFKA_HOME/bin/kafka-console-consumer-sasl.sh
cp $KAFKA_HOME/bin/kafka-consumer-groups.sh $KAFKA_HOME/bin/kafka-consumer-groups-sasl.sh
在上述四个-sasl
文件中的倒数第二行都新增:
测试Kafka安全模式
# 启动Kerberos安全模式下的Kafka。(-daemon 后台启动)
cd $KAFKA_HOME
bin/kafka-server-start-sasl.sh -daemon config/server-sasl.properties
# 创建topic
bin/kafka-topics-sasl.sh --create --topic topicDB --bootstrap-server hadoop102:19092,hadoop103:19092,hadoop104:19092 --partitions 4 --replication-factor 3 --config retention.ms=259200000 --command-config config/kerberos/client.properties
# 查看topic
bin/kafka-topics-sasl.sh --bootstrap-server hadoop102:19092,hadoop103:19092,hadoop104:19092 --describe --topic topicDB --command-config config/kerberos/client.properties
# 删除topic
bin/kafka-topics-sasl.sh --delete --topic topicDB --bootstrap-server hadoop102:19092,hadoop103:19092,hadoop104:19092 --command-config config/kerberos/client.properties
# 启动消费者
bin/kafka-console-consumer-sasl.sh --bootstrap-server hadoop102:19092,hadoop103:19092,hadoop104:19092 --topic topicDB --from-beginning --consumer.config config/kerberos/client.properties
在Windows上验证Kerberos
- 把服务器上的
/etc/krb5.conf
下载到Windows本地krb5.ini
,同时修改下述内容:
把服务器上的
/etc/security/keytabs/kafkauserService.keytab
下载到Windows本地。在Java客户端的Properties中新增下述内容:
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.jaas.config",
"com.sun.security.auth.module.Krb5LoginModule required " +
"useKeyTab=true " +
"keyTab=\"D:/path/to/kafkauserService.keytab\" " +
"storeKey=true " +
"useTicketCache=false " +
"principal=\"kafka_user@EXAMPLE.COM\";");
- 在Kafka生产者的
main()
函数中添加下述内容:
最后记得将brokerList中的端口修改为Kerberos安全模式下Kafka暴露的端口。