Skip to content

MQ RocketMQ_分布式事务

分布式事务的实现

逻辑步骤

RocketMQ 实现分布式事务的实现机制是通过事务消息加上事务反查机制

具体步骤:

  1. 发送半消息(Prepare Message)
    • 生产者向 MQ 发送一个特殊的消息(半消息),这个消息不会立即投递给消费者。
  2. 执行本地事务
    • 生产者发送半消息后,将执行本地事务逻辑(如数据库操作)。这个步骤的成功与否将决定消息是否可被消费。
  3. 根据本地事务状态提交或回滚消息
    • 如果本地事务成功,生产者将通知 MQ 提交消息,使其对消费者可见。
    • 如果本地事务失败,生产者将通知 MQ 回滚消息,消息将被删除不会投递给消费者。
  4. 事务状态回查
    • 如果 MQ 长时间没有收到关于这个半消息的最终状态(提交或回滚),MQ 将向生产者发送回查消息。生产者需要检查本地事务的状态,并回应 MQ 事务的最终状态。

image.png

事务回查机制

一般会有一个定时机制,然后如果长时间没有收到二次确认,会进行事务回查

  • 这个是生产者反而是消费者,收到这条消息后,通过本地实现的 checker 接口的 checkLocalTransaction 方法进行检查本地事务是否执行完成,并将结果告知 MQ 服务器。

image.png

代码分析

参考: https://juejin.cn/post/6844904099993878536

学习项目地址: https://gitee.com/wuhonglin2/rocketmq-jta-demo?skip_mobile=true


你先看一下这个 gpt 给的一个 demo 示例

假设您需要在电商系统中处理订单支付的场景,其中用户的支付操作需要更新订单状态并扣减库存,这两个操作要么同时成功,要么同时失败。

步骤一:发送半消息:

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.exception.MQClientException;

TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

Message msg = new Message("order_topic", "TagA", "OrderID001", ("支付订单" + order.getId()).getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送半消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

步骤 2: 执行本地事务

这个步骤通常是在事务监听器中实现:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;

producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务(比如数据库操作)
            orderService.payOrder(order.getId());
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        String orderId = msg.getKeys();
        if (orderService.checkOrderPaid(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
});

步骤 3: 提交或回滚消息

根据本地事务的执行结果,RocketMQ 将处理消息的提交或回滚


项目内容

建表:

-- 积分系统  
CREATE DATABASE rocketmq-demo-points;  
DROP TABLE IF EXISTS `t_points`;  
CREATE TABLE `t_points` (  
                            `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',  
                            `user_id` bigint(20) NOT NULL COMMENT '用户ID',  
                            `order_no` bigint(20) NOT NULL COMMENT '订单编号',  
                            `points` int(5) NOT NULL COMMENT '积分',  
                            `remarks` varchar(128) NOT NULL DEFAULT '无' COMMENT '备注',  
                            PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='积分表';  
  
SET FOREIGN_KEY_CHECKS = 1;  
  
-- 订单系统  
DROP TABLE IF EXISTS `t_order`;  
CREATE TABLE `t_order` (  
                           `order_id` bigint(20) NOT NULL COMMENT '订单编号',  
                           `create_time` datetime NOT NULL COMMENT '创建时间',  
                           `user_id` bigint(20) NOT NULL COMMENT '用户ID',  
                           `amount` decimal(18,2) NOT NULL COMMENT '订单金额',  
                           PRIMARY KEY (`order_id`) USING BTREE  
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';  
  
DROP TABLE IF EXISTS `transaction_log`;  
CREATE TABLE `transaction_log` (  
                                   `id` varchar(32) NOT NULL COMMENT '事务ID',  
                                   `business` varchar(32) NOT NULL COMMENT '业务标识',  
                                   `foreign_key` varchar(32) NOT NULL COMMENT '对应业务表中的主键',  
                                   PRIMARY KEY (`id`)  
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';  
  
SET FOREIGN_KEY_CHECKS = 1;

这个项目是分为了两个模块,一个是订单模块,一个是积分模块;

你可以从 demo 中看出,一般我们会先发送一条半消息(通过 sendMessageInTransaction )

/**  
 * 事务消息发送  
 * @param data  
 * @param topic  
 * @return  
 */public TransactionSendResult send(String data, String topic) throws MQClientException {  
    //使用 RocketMQ 的 TransactionMQProducer 来发送事务性消息  
    Message message = new Message(topic, data.getBytes());  
    return producer.sendMessageInTransaction(message, null);  
}

然后,当 TransactionProducer 发送一个事务消息时,TransactionMQProducer 会先存储一个半消息到消息队列,然后调用 TransactionListener 实现的 executeLocalTransaction 方法

@Override  
public LocalTransactionState executeLocalTransaction(Message message, Object o) {  
    log.info("开始执行本地事务....");  
    LocalTransactionState state = null;  
    try {  
        String body = new String(message.getBody());  
        TOrder tOrder = JSONObject.parseObject(body, TOrder.class);  
        // 执行本地事务  
        orderService.insertOrder(tOrder, message.getTransactionId());  
        state = LocalTransactionState.COMMIT_MESSAGE;  
        log.info("本地事务已提交。{}", message.getTransactionId());  
    }catch (Exception e){  
        log.error("执行本地事务失败。{}", e.getMessage());  
        state = LocalTransactionState.ROLLBACK_MESSAGE;  
    }  
    return state;  
}

事务回查也是在这个类中进行操作的【消息队列(Broker)未在规定时间内收到消息的最终提交(COMMIT)或回滚(ROLLBACK)状态。在这种情况下,Broker 需要确定消息的确切状态,会进行回查操作】

package com.xxx.order.service;

import com.alibaba.fastjson.JSONObject;
import com.xxx.common.domain.TOrder;
import com.xxx.common.domain.TransactionLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by Sinotn
 *
 * @Author: libin
 * @CreateTime: 2020-10-28 14:07
 * @Description: 订单事务监听类
 */
@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderService orderService;

    @Autowired
    private TransactionLogService transactionLogService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        LocalTransactionState state = null;
        try {
            String body = new String(message.getBody());
            TOrder tOrder = JSONObject.parseObject(body, TOrder.class);
            // 执行本地事务
            orderService.insertOrder(tOrder, message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事务已提交。{}", message.getTransactionId());
        }catch (Exception e){
            log.error("执行本地事务失败。{}", e.getMessage());
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("开始回查本地事务....");
        LocalTransactionState state = null;
        String transId = messageExt.getTransactionId();
        // 如果本地事务存在,则事务提交成功
        TransactionLog transactionLog = transactionLogService.getById(transId);
        if (null != transactionLog){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else{
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }
}

即只有当业务操作成功完成时,相关的消息才会被发送到消息队列供消费者处理。这对于需要强一致性的业务流程至关重要

接收者的部分代码

@Override  
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
    log.info("消费者线程监听到消息。");  
    for (MessageExt messageExt: list){  
        if (!processor(messageExt)){  
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
        }  
    }  
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
}

/**  
 * 消息处理,第3次处理失败后,发送邮件通知人工介入  
 * @param messageExt  
 * @return  
 */private boolean processor(MessageExt messageExt){  
    String body = new String(messageExt.getBody());  
    try{  
        log.info("消息处理...{}", body);  
        log.info("开始处理订单数据,准备增加积分....");  
        TOrder order  = JSONObject.parseObject(body, TOrder.class);  
        pointsService.insert(order);  
        // 模拟异常  
        //int k = 1/0;  
        return true;  
    }catch (Exception e){  
        if (messageExt.getReconsumeTimes() >= 3){  
            log.error("消息重试已达最大次数,将通知业务人员排查问题。{}",messageExt.getMsgId());  
            // 发送邮件或者报警  
            // sendMail(messageExt);  
            return true;  
        }  
        return false;  
    }  
}

这里看半消息 sendMessageInTransaction 和 executeLocalTransaction 的确是对应关系,不过这个关系是在初始化的时候进行定义的

package com.xxx.order.service;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by Sinotn
 *
 * @Author: libin
 * @CreateTime: 2020-10-28 14:05
 * @Description: 事务生产
 */
@Component
public class TransactionProducer {

    private static final String GROUP_NAME = "order_trans_group";

    private TransactionMQProducer producer;

    @Autowired
    private OrderTransactionListener orderTransactionListener;

    //执行任务的线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));

    @PostConstruct
    public void init(){
        producer = new TransactionMQProducer(GROUP_NAME);
        producer.setNamesrvAddr("192.168.56.105:9876");
        producer.setSendMsgTimeout(100);
        producer.setExecutorService(executor);
        producer.setTransactionListener(orderTransactionListener);
        this.start();
    }

    private void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 事务消息发送
     * @param data
     * @param topic
     * @return
     */
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        //使用 RocketMQ 的 TransactionMQProducer 来发送事务性消息
        Message message = new Message(topic, data.getBytes());
        return producer.sendMessageInTransaction(message, null);
    }

}