淘先锋技术网

首页 1 2 3 4 5 6 7

 Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
 为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获得原来的 PID。
 为了管理 Transaction,Kafka 引入了一个新的组件:Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有状态写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务也可以得到恢复,从而继续进行。
 上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
 Kafka 事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。Kafka的事务主要用于以下两种场景:
  ①生产者发送多条消息可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败;
  ②read-process-write模式:将消息的消费和生产(可以是另一个新消息,也可以是原来的消息)封装在一个事务中,形成一个原子操作。 在一个流式处理的应用中,常常一个服务需要从上游接收消息,经过处理后再送达到下游(发送到下游的消息可能是新消息也可能是原来的消息,具体看业务场景),这就对应着消息的消费和生成。
 当事务中仅仅存在Consumer消费消息的操作时,它和Consumer手动提交Offset并没有区别。因此单纯的消费消息并不是Kafka引入事务机制的原因,单纯的消费消息也没有必要存在于一个事务中。

一、Kafka中Producer的事务API
 Kafka Producer API提供了以下接口用于事务操作:

/**
 * 初始化事务
 */
public void initTransactions();
/**
 * 开启事务
 */
public void beginTransaction() throws ProducerFencedException;
/**
 * 在事务内提交已经消费的偏移量
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
/**
 * 提交事务
 */
public void commitTransaction() throws ProducerFencedException;
/**
 * 丢弃事务
 */
public void abortTransaction() throws ProducerFencedException;

 下面是使用Kafka事务特性的例子,这段代码Producer开启了一个事务,然后在这个事务中发送了两条消息,这两条消息要么都发送成功,要么都失败。示例如下:

// 生产者:生成事务id(该事务id是有要求的,详见下文论述)
KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id”, “my-transactional-id");
// 初始化事务
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 事务内操作
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
//提交事务
producer.commitTransaction();

 下面是一个 read-process-write 模式示例:在一个Kafka事务中,同时涉及到了生产消息和消费消息

// 生产者:指定事务id(该事务id是有要求的,详见下文论述)
KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id", "my-transactional-id");
// 消费者
KafkaConsumer consumer = createKafkaConsumer("bootstrap.servers", "localhost:9092","group.id", "my-group-id","isolation.level", "read_committed");
// 消费者订阅topic
consumer.subscribe(singleton("inputTopic"));
// 初始化事务
producer.initTransactions();
while (true) {
  //批量消费消息
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  //开始事务
  producer.beginTransaction();
  for (ConsumerRecord record : records){
    //将消费的消息发送到另一个topic
    producer.send(producerRecord(“outputTopic”, record));
  }
  //记录消费者所在消费者组消费的offset
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group); 
  //提交事务
  producer.commitTransaction();
}

 这种场景通常都是先消费消息,消费消息后产生另一个消息发送到另一个topic上,但这个过程需要是原子的,因此需要事务保证。

二、Kafka的事务配置
 对于Producer,需要设置 transactional.id 属性,这个属性的作用下文会提到。设置了 transactional.id 属性后,enable.idempotence 属性会自动设置为true。
 对于Consumer,需要设置 isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。

三、Kafka的事务特性
 Kafka的事务特性本质上代表了三个功能:原子写操作拒绝僵尸实例(Zombie fencing)读事务消息
 1、原子写
  Kafka的事务特性本质上是支持了Kafka跨分区和Topic的原子写操作。在同一个事务中的消息要么同时写入成功,要么同时写入失败。我们知道,Kafka中的Offset信息存储在一个名为 __consumer_offsets 的Topic中,因此read-process-write模式,除了向目标Topic写入消息,还会向 __consumer_offsets 中写入已经消费的Offsets数据。因此read-process-write本质上就是跨分区和Topic的原子写操作。Kafka的事务特性就是要确保跨分区的多个写操作的原子性。
 2、拒绝僵尸实例(Zombie fencing)
  在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。
   Kafka事务特性通过transaction-id属性来解决僵尸实例问题。所有具有相同transaction-id的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,会检查当前事务提交者的epoch是不是最新的,若不是就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。即同一个事务组中的生产者只能有一个在生产数据。 因此,这个transaction-id的值并不是随机的(即不能在代码中随机生成),而是有要求的,相当于使用transaction-id对生产者进行分组,避免重复生产消息。
 3、读事务消息
  为了保证事务特性,Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。在Producer成功提交事务后,Kafka会将所有该事务中的消息的 Transaction Marker 从uncommitted标记为committed状态,从而所有的Consumer都能够消费。

四、Kafka事务原理
 Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator。主要负责分配pid,记录事务状态等操作。下面是Kafka开启一个事务到提交一个事务的流程图:
在这里插入图片描述
 主要分为以下步骤:
 ①查找Tranaction Corordinator:Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址
 ②初始化事务 initTransaction:Producer发送InitpidRequest给Transaction Coordinator,获取pid。Transaction Coordinator在Transaciton Log中记录着<TransactionId,pid>的映射关系。另外,它还会做两件事:只要开启了幂等特性即必须执行InitpidRequest,而无须考虑该Producer是否开启了事务特性

    a、恢复(Commit或Abort)之前的Producer未完成的事务
    b、对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样,而epoch是不同的

 ③开始事务beginTransaction:执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启
 ④read-process-write流程:一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。
 ⑤事务提交或终止 commitTransaction/abortTransaction:在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个两阶段提交:

    第一阶段,将Transaction Log内的该事务状态设置为PREPARE_COMMIT或PREPARE_ABORT
    第二阶段,将Transaction Marker写入该事务涉及到的所有消息(即将消息标记为committed或aborted)。
    这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,
    会将对应的Transaction Marker控制信息写入日志。 一旦Transaction Marker写入完成,Transaction Coordinator会
    将最终的COMPLETE_COMMIT或COMPLETE_ABORT状态写入Transaction Log中以标明该事务结束。

五、Kafka的事务总结
 Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性。
 Offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理。
 Kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的Offset的更新进行同样的标记(Transaction Marker)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见。
 Kafka只提供对Kafka本身的读写操作的事务性,不提供包含外部系统的事务性。

 参考:Kafka事务特性详解