Kafka的消息key有啥特殊的?

今天看到一篇优质的kafka分享《Kafka 万亿级消息实践之资源组流量掉零故障排查分析 》,里面有个比较有意思的场景:producer在生产端的消息指定了key属性,在broker节点磁盘故障之后,某topic数据无法正常发送。不仔细深究,第一印象肯定会疑问,不是应该故障后转移吗?咋会一直受影响呢?

带着这个疑问我们先来看下kafka源码ProducerRecord.java类的注解对消息key的作用说明:

A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.

要发送到 Kafka 的键/值对。这包括被发送记录的主题名称可选的分区号以及可选的k/v

If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.

如果指定了有效的分区(partition)号,则在发送记录时将使用该分区。如果没有指定分区但存在key属性声明,那么将使用 key 的 hash 值选择分区。如果 key 和 partition 都没有被声明,那么将以轮巡的方式分配分区(partition)。

The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the topic.

记录(record)还有一个关联的时间戳属性。如果用户未提供时间戳,生产者将使用当前时间戳标记该记录。 也就是说,时间戳最终会被kafka应用,其依赖时间戳类型来配置主题(topic)。

  • If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}, the timestamp in the producer record will be used by the broker.

  • 如果主题配置为使用 CREATE_TIME,producer 记录中的时间戳将会被broker应用。

  • If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime}, the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the message to its log.

  • 如果主题配置为使用 LOG_APPEND_TIME,producer记录中的时间戳将会被broker本地时间覆盖掉,覆盖成以broker本地的时间为准。

In either of the cases above, the timestamp that has actually been used will be returned to user in {@link RecordMetadata}

对于以上两种情况之一,确实已被应用的时间戳将会在RecordMetadata中返回给用户。

问题到这里基本上清晰了,如果producer消息指定了key,key的hash值将与topic的分区数进行模运算,得出消息路由的分区号。因为该分区所处磁盘故障,消息发送持续等待超时,当批次消息等待超时后,下次进行消息路由时依然会出现路由到故障节点上的情况,且每次等待超时时间后才释放占用的资源。所以若非必要发送消息时不要指定key,否则可能会出现topic所有分区雪崩效应。

这里我们提供几种Python版本的send功能实现方式:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:9091'])

producer.send('my-topic', b'raw_bytes')
producer.send('my-topic', key=b'foo', value=b'bar')
producer.send('my-topic', value=data, partition=0, timestamp_ms='')

最后还有一个疑问:什么时候需要设定这个消息key呢?

在一些特殊场景下发送消息的时候指定 key,我们可以用来确定消息写入哪一个分区。你可以使用有明确业务含义的字段作为key,比如用户号,这样就可以保证同一个用户号进入同一个分区。