Skip to content

发布-订阅机制

什么是事件

Spring 事件(Spring Events)是 Spring 提供的事件发布-监听机制,可以用来在不同的组件之间传递消息或执行特定的逻辑操作。它基于 观察者模式 (Observer Pattern),支持异步或同步地处理事件。

主要组成部份

  • 1、事件 Event
    • Spring 中的事件需要继承 ApplicationEvent 或直接实现 ApplicationEventPublisher.
    • 事件本质上是一个携带具体信息的对象,用于在应用程序中的事件发布和监听之间传递消息
  • 2、事件发布者 Publisher
    • 发布者是触发事件的一方,通常在应用程序中通过 ApplicationEventPublisher 发布事件
  • 3、事件监听者 Listener
    • 监听器负责处理发布的事件,通常是实现 ApplicationListener 接口,或者通过 @EventListener 注解来监听特定类型的事件

Spring 事件的工作原理

  1. 当事件被发布(通过 ApplicationEventPublisher 调用 publishEvent() 方法)时,Spring 将事件传播给监听器。
  2. Spring 框架内置的事件传播器会根据注册的监听器找到与特定事件类型匹配的监听对象。
  3. 匹配的监听器将执行其方法来处理事件的逻辑。

Spring 的内置事件

除了自定义事件,Spring 自带了一些常用的事件,例如:

  1. ContextRefreshedEvent:当 Spring 容器初始化或刷新时触发。
  2. ContextStartedEvent:当调用 ConfigurableApplicationContext 接口的 start() 方法时触发。
  3. ContextStoppedEvent:当调用 ConfigurableApplicationContext 接口的 stop() 方法时触发。
  4. ContextClosedEvent:当 Spring 容器关闭时触发。

Kafka事件监听操作

使用的 spring 事件发布订阅机制,加一个kafka 消费监听机制

1. ExternalizedDirect 注解定义

java
/**
 * ExternalizedDirect 注解
 * 用于标记需要被发送到外部消息系统(Kafka)的事件类
 * 可应用于类级别,运行时可获取
 */
@Retention(RetentionPolicy.RUNTIME)  // 注解在运行时保留,可通过反射获取
@Target(ElementType.TYPE)            // 只能应用于类/接口
public @interface ExternalizedDirect {

    /**
     * 逻辑目标名称
     * 格式通常为: "kafka模板名称@@主题名称"
     * 是target()的别名
     * @see #target()
     */
    @AliasFor("target")
    String value() default "";

    /**
     * 逻辑目标名称
     * 是value()的别名
     * @see #value()
     */
    @AliasFor("value")
    String target() default "";
}

2. Kafka事件外部化配置

java
/**
 * Kafka事件外部化配置类
 * 负责将应用事件转发到Kafka消息队列
 */
@Slf4j
@ConditionalOnClass(KafkaTemplate.class)  // 只有当KafkaTemplate类存在时才启用此配置
@Configuration
class KafkaEventExternalizerConfiguration {

    /**
     * 配置JPA事件发布仓库
     * 用于持久化事件记录
     * 
     * @param em JPA实体管理器
     * @param serializer 事件序列化器
     * @return JPA事件发布仓库
     */
    @Bean
    @Primary
    JpaEventPublicationRepository eventPublicationRepository(EntityManager em, EventSerializer serializer) {
        return new JpaEventPublicationRepository(em, serializer);
    }

    /**
     * 提供时钟实例,用于事件时间戳
     * 注意:此处使用了+8小时偏移,适应中国时区
     * 
     * @return 系统时钟
     */
    Clock clock() {
        return Clock.offset(Clock.systemDefaultZone(), java.time.Duration.ofHours(8));
    }

    /**
     * 配置Kafka事件外部化器
     * 负责将事件转发到Kafka
     * 
     * @param configuration 事件外部化配置
     * @param objectMapperProvider JSON序列化工具提供者
     * @param operationsMap Kafka操作模板映射表
     * @param factory Spring Bean工厂
     * @return 委托事件外部化器
     */
    @Bean
    DelegatingEventExternalizer kafkaEventExternalizer(
            EventExternalizationConfiguration configuration,
            ObjectProvider<ObjectMapper> objectMapperProvider,
            Map<String, KafkaOperations> operationsMap,
            BeanFactory factory) {

        log.debug("Registering domain event externalization to Kafka…");

        // 创建表达式评估上下文
        var context = new StandardEvaluationContext();
        context.setBeanResolver(new BeanFactoryResolver(factory));
        
        // 获取或创建ObjectMapper
        ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(() -> new ObjectMapper());
        
        // 创建并返回委托事件外部化器
        return new DelegatingEventExternalizer(configuration, (target, payload) -> {
            // 解析目标路由
            BrokerRouting routing = BrokerRouting.of(target, context);
            String targetString = routing.getTarget();
            
            // 解析Kafka模板名称和主题
            String[] parts = targetString.split("@@", 2);
            if (parts.length != NumConstants.TWO) {
                throw new IllegalArgumentException(" must be in the format 'kafkaTemplate@@topic'");
            }
            String kafkaTemplateName = parts[0];
            String topic = parts[1];
            
            // 获取Kafka操作模板
            KafkaOperations<Object, Object> kafkaOperations = operationsMap.get(kafkaTemplateName);
            if (kafkaOperations == null) {
                throw new IllegalArgumentException("No KafkaOperations bean found with name " + kafkaTemplateName);
            }
            
            // 序列化负载
            String payloadString;
            try {
                payloadString = objectMapper.writeValueAsString(payload);
            } catch (JsonProcessingException e) {
                throw new RuntimeException("Failed to serialize payload", e);
            }
            
            // 记录日志并发送消息
            log.info("kafkaEventExternalizer消息发送,msg:" + topic + " : " + payloadString);
            return kafkaOperations.send(topic, routing.getKey(payload), payloadString);
        });
    }
}

3. Broker事件直接监听器

java
/**
 * Broker事件直接监听器
 * 监听Spring应用事件并将带有@ExternalizedDirect注解的事件转发到Kafka
 */
@Slf4j
@ConditionalOnClass(KafkaTemplate.class)
@Configuration
public class BrokerEventDirectListener implements ApplicationListener<PayloadApplicationEvent>, ConditionalEventListener {

    // 缓存类与注解的映射关系,提高性能
    private static Map<Class<?>, ExternalizedDirect> LOOKUPS = new ConcurrentReferenceHashMap<>(25);
    // SpEL表达式解析相关
    private static final TemplateParserContext CONTEXT = new TemplateParserContext();
    private static final StandardEvaluationContext EVALUATION_CONTEXT = new StandardEvaluationContext();
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();

    // Kafka操作模板集合
    private final Map<String, KafkaOperations> operationsMap;

    /**
     * 构造函数
     * @param operationsMap Kafka操作模板映射表
     */
    public BrokerEventDirectListener(Map<String, KafkaOperations> operationsMap) {
        this.operationsMap = operationsMap;
    }

    /**
     * 应用事件处理方法
     * 实现事务感知的事件处理
     * @param event Spring应用事件
     */
    @Override
    public void onApplicationEvent(PayloadApplicationEvent event) {
        Object eventPayload = event.getPayload();
        // 如果在活跃事务中,注册事务同步器以在事务提交后处理事件
        if (TransactionSynchronizationManager.isActualTransactionActive() && TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    processEvent(eventPayload);
                }
            });
        } else {
            // 没有活跃事务,直接处理
            processEvent(eventPayload);
        }
    }

    /**
     * 判断是否支持处理该事件
     * @param event 事件对象
     * @return 如果事件类带有ExternalizedDirect注解则返回true
     */
    @Override
    public boolean supports(Object event) {
        return LOOKUPS.containsKey(event.getClass()) || 
               AnnotatedElementUtils.hasAnnotation(event.getClass(), ExternalizedDirect.class);
    }

    /**
     * 处理事件
     * 将事件发送到Kafka
     * @param eventPayload 事件负载对象
     */
    private void processEvent(Object eventPayload) {
        // 获取类上的注解,使用缓存提高性能
        ExternalizedDirect annotation = LOOKUPS.computeIfAbsent(
            eventPayload.getClass(), 
            e -> AnnotatedElementUtils.findMergedAnnotation(e, ExternalizedDirect.class)
        );
        
        // 获取目标路由信息
        String source = annotation.target();
        BrokerRouteTarget brokerRouteTarget = new BrokerRouteTarget(source);
        
        // 获取Kafka操作模板
        KafkaOperations<Object, Object> kafkaOperations = operationsMap.get(brokerRouteTarget.getBrokerTemplateName());
        if (kafkaOperations == null) {
            throw new IllegalArgumentException("No KafkaOperations bean found with name " + brokerRouteTarget.getBrokerTemplateName());
        }
        
        // 序列化并发送消息
        try {
            String payloadString = JsonTools.defaultMapper().toJson(eventPayload);
            log.info("KafkaEventWthNoTransactionListener消息发送,msg:" + brokerRouteTarget.getTarget() + " : " + payloadString);
            
            kafkaOperations.send(
                brokerRouteTarget.getTarget(), 
                brokerRouteTarget.getKeyValue(eventPayload), 
                payloadString
            ).exceptionally(ex -> {
                log.error("KafkaEventWthNoTransactionListener消息发送失败,data{},error:{}", payloadString, ExceptionUtils.getStackTrace(ex));
                return null;
            });
        } catch (Exception e) {
            log.error("KafkaEventWthNoTransactionListener消息发送失败,data{},error:{}", eventPayload, ExceptionUtils.getStackTrace(e));
        }
    }

    /**
     * 内部类:Broker路由目标
     * 负责解析路由字符串并提取Kafka模板名、目标主题和消息键
     */
    @Getter
    static class BrokerRouteTarget {
        private String brokerTemplateName; // Kafka模板Bean名称
        private String target;             // 目标主题
        private String key;                // 消息键表达式

        /**
         * 构造函数
         * @param kafkaTemplateName Kafka模板名称
         * @param target 目标主题
         * @param key 消息键
         */
        public BrokerRouteTarget(String kafkaTemplateName, String target, String key) {
            this.brokerTemplateName = kafkaTemplateName;
            this.target = target;
            this.key = key;
        }

        /**
         * 通过路由字符串构造
         * 解析格式: "kafkaTemplate@@topic::key"
         * @param routeTargetString 路由目标字符串
         */
        public BrokerRouteTarget(String routeTargetString) {
            // 分离键部分 (使用:: 分隔)
            var parts = routeTargetString.split("::", 2);
            var targetString = parts[0].isBlank() ? null : parts[0];
            
            Assert.hasText(targetString, "Target must not be null or empty,format must be in the format 'kafkaTemplate@@target' or 'kafkaTemplate@@target::key'!");
            
            // 分离模板名和主题 (使用@@ 分隔)
            String[] targetParts = targetString.split("@@", 2);
            Assert.isTrue(targetParts.length == 2, "format must be in the format 'kafkaTemplate@@target' or 'kafkaTemplate@@target::key'!");
            
            this.key = parts.length == 2 ? parts[1] : null;
            this.brokerTemplateName = targetParts[0];
            this.target = targetParts[1];
        }

        /**
         * 获取消息键值
         * 支持SpEL表达式计算键值
         * @param eventPayload 事件负载对象
         * @return 消息键值
         */
        public String getKeyValue(Object eventPayload) {
            return key != null && key.startsWith("#{") 
                ? Optional.ofNullable(PARSER.parseExpression(key, CONTEXT).getValue(EVALUATION_CONTEXT, eventPayload))
                      .map(Object::toString)
                      .orElse(null) 
                : null;
        }
    }
}

流程分析 🐕‍🦺

  1. 事件标记与路由

    • @ExternalizedDirect 注解用于标记需要发送到Kafka的事件类
    • 通过 "kafkaTemplate@@topic" 格式指定Kafka模板和目标主题

第二步:Kafka事件外部化配置,这里声明了一下 JPA 持久化操作,

  • 如果是:➡️ 普通领域事件(如 LossLeaderActivityProductEffectedEvent)→ ✅ 会记录到数据库表中
    • 为需要可靠性保证的领域事件提供持久化存储,确保事件不丢失并支持失败重试
  • 带 @ExternalizedDirect 的消息(如 SelfActivitySuccessMessage)→ ❌ 不会记录
  1. 事件发布流程

    应用代码 -> ApplicationEventPublisher.publishEvent(事件对象)
               -> BrokerEventDirectListener接收事件
               -> 检查是否有@ExternalizedDirect注解
               -> 解析路由信息
               -> 序列化事件对象
               -> 通过KafkaTemplate发送到指定主题
  2. 事务集成

    • 如果事件发布发生在事务上下文中,框架会自动注册事务同步器
    • 只有当事务成功提交后,事件才会被实际发送到Kafka
    • 这确保了数据一致性,防止事务回滚时消息被错误发送

使用示例

定义事件类
java
@Data
@Accessors(chain = true)
@ExternalizedDirect("kafka-public-template@@order-created-topic")
public class OrderCreatedEvent {
    private Long orderId;
    private Date createTime;
    private String creator;
}
发布事件
java
@Service
public class OrderService {
    private final ApplicationEventPublisher eventPublisher;
    
    public void createOrder(OrderDto dto) {
        // 业务逻辑...
        
        // 发布事件
        OrderCreatedEvent event = new OrderCreatedEvent()
            .setOrderId(savedOrder.getId())
            .setCreateTime(new Date())
            .setCreator(dto.getOperator());
            
        eventPublisher.publishEvent(event);
    }
}
消费事件
java
@Component
public class OrderEventConsumer {
    @KafkaListener(
        topics = "order-created-topic",
        groupId = "order-processing-group"
    )
    public void handleOrderCreated(String message) {
        // 反序列化消息
        OrderCreatedEvent event = JsonTools.defaultMapper().fromJson(message, OrderCreatedEvent.class);
        
        // 处理事件...
    }
}

11