Skip to content

Kafka 重试-原生重试机制

使用 手动 ACK + 阻塞式重试 的方式实现 Kafka 原生重试

ACK 模式配置

声明配置类后,在配置类声明相关属性配置

代码示例(某个项目中), 禁用自动提交,配合手动 ACK 使用

`stringObjectMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  • 使用 手动 ACK 模式
  • 必须显式调用 ack.acknowledge() 才能提交 offset
  • 如果不调用 ack:消息 offset 不会提交,下次重启服务会重新消费该消息

ack 跟服务有关系,跟重试关系不大

阻塞式重试配置

  @Value(value = "${kafka.backoff.interval:1000}")
  private Long interval;  // 重试间隔,默认 1000ms(1秒)

  @Value(value = "${kafka.backoff.max_failure:3}")
  private Long maxAttempts;  // 最大重试次数,默认 3 次

  @Bean
  public DefaultErrorHandler errorHandler() {
	  BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
	  DefaultErrorHandler errorHandler = new DefaultErrorHandler(..., fixedBackOff);
	  // 可重试的异常
	  errorHandler.addRetryableExceptions(SocketTimeoutException.class, ...);
	  // 不可重试的异常
	  errorHandler.addNotRetryableExceptions(NullPointerException.class, ...);
	  return errorHandler;
  }

注意一下,这里是 addRetryableExceptions,不是 set;

如果这里是没有明确声明的不可重试异常,他的默认机制也是会走重试操作的。

Consumer 配置

  // 两次 poll 之间最大间隔,超过触发 rebalance
  stringObjectMap.put("max.poll.interval.ms", "60000");  // 60秒

  // session 超时时间,超过服务端认为消费者离线
  stringObjectMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // 30秒

  // 每次 poll 最大记录数
  stringObjectMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);  // 10条

项目配置文件中自定义参数配置

# 启动kafka消费者
kafka:
	consumer:
	  auto-start: false  # 是否自动启动消费者
  
#可通过配置文件覆盖的参数
kafka:
	backoff:
	  interval: 1000      # 重试间隔(毫秒),默认1秒
	  max_failure: 3      # 最大重试次数,默认3次
	consumer:
	  auto-start: true    # 是否自动启动

重试机制流程

场景 1:消费过程中抛异常

  1. 消费方法抛出异常
  2. DefaultErrorHandler 拦截异常
  3. 判断是否为可重试异常
  4. 如果可重试,等待 interval 时间后同步阻塞重试
  5. 最多重试 maxAttempts 次
  6. 重试成功或达到最大次数后记录日志
  7. 消息被视为处理完成,继续消费下一条

场景 2:不调用 ack.acknowledge()

不 ack 的后果:

  • 当前消息 offset 不会提交 到 Kafka
  • 服务重启后,会从上次提交的 offset 开始重新消费
  • 但在运行期间不会自动重试(除非抛异常触发 ErrorHandler)

⬆️ 这里 ack 的作用,更多的还是 消息 offset 的下表是否 commit,跟重试关系不大

场景 3:超过 max.poll.interval.ms

  • 如果消费逻辑耗时超过 60 秒(默认 max.poll.interval.ms)
  • Kafka 认为消费者已死,触发 Rebalance
  • 消息会被分配给其他消费者实例
  • 原消费者会收到 CommitFailedException

代码解释 🐕‍🦺

解释一段项目中项目配置类中使用到的代码

@Bean  
public DefaultErrorHandler errorHandler() {  
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);  
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {  
        log.error("kafka消费失败, topic: {}, partition: {}, offset: {}, key: {}, value: {}",  
            consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value(), ExceptionUtils.getStackTrace(e));  
    }, fixedBackOff);  
    errorHandler.addRetryableExceptions(SocketTimeoutException.class, BeanCreationNotAllowedException.class, DataAccessResourceFailureException.class);  
    errorHandler.addNotRetryableExceptions(NullPointerException.class,GenericException.class);  
    return errorHandler;  
}
  • 定义重试机制:FixedBackOff
    • FixedBackOff:它是一个固定间隔的阻塞重试策略,重试的间隔时间和最大重试次数通过 interval 和 maxAttempts 指定
  • 配置 DefaultErrorHandler
  • 配置可以重试与不可重试的异常
    • addRetryableExceptions:设置哪些异常是可以进行重试的。如果这些异常在消费过程中抛出,Kafka 会按配置的重试机制进行处理。
    • addNotRetryableExceptions:设置哪些异常是不可重试的。如果抛出这些异常,消息会被直接丢弃或写入死信队列,而不会重试。