FolkMQ v1.7.8

事务消息与其它框架事务整合

1、与 solon 框架事务整合

@Component
public class UserService {
    @Inject
    MqClient client;
    
    @Inject
    UserDao userDao;

    @Tran
    public void addUser(long userId, String userName) throws Exception{
        //1.创建用户
        userDao.createUser(userId, userName);
        
        //2.发布事件
        MqTransaction mqTran = client.newTransaction();

        //通过框架的事务监听器,与框架事务整合(这里可封装工具,简化代码)
        TranUtils.listen(new TranListener() {
            @Override
            public void beforeCommit(boolean readOnly) throws Throwable {
                mqTran.commit(); //提交
            }

            @Override
            public void afterCompletion(int status) {
                if (status == TranListener.STATUS_ROLLED_BACK) {
                    RunUtil.runAndTry(mqTran::rollback); //回滚
                }
            }
        });
        
        String userIdStr = String.valueOf(userId);

        //发布注册事件
        client.publish("user.registered", new MqMessage("").attr("userId",userIdStr).transaction(mqTran));
        //发布唤醒事件(10天后执行)
        client.publish("user.reawakened", new MqMessage("").attr("userId",userIdStr).transaction(mqTran).scheduled(DateTime.Now().addDay(10)));
    }
}

2、与 springboot 框架事务整合

@Service
public class UserService {
    @Autowired
    MqClient client;
    
    @Autowired
    UserDao userDao;

    @Transactional
    public void addUser(long userId, String userName) throws Exception{
        //1.创建用户
        userDao.createUser(userId, userName);
        
        //2.发布事件
        MqTransaction mqTran = client.newTransaction();

        //通过框架的事务监听器,与框架事务整合(这里可封装工具,简化代码)
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void beforeCommit(boolean readOnly) {
                mqTran.commit(); //提交
            }

            @Override
            public void afterCompletion(int status) {
                if(TransactionSynchronization.STATUS_ROLLED_BACK == status){
                    mqTran.rollback(); //回滚
                }
            }
        });
        
        String userIdStr = String.valueOf(userId);

        //发布注册事件
        client.publish("user.registered", new MqMessage("").attr("userId",userIdStr).transaction(mqTran));
        //发布唤醒事件(10天后执行)
        client.publish("user.reawakened", new MqMessage("").attr("userId",userIdStr).transaction(mqTran).scheduled(DateTime.Now().addDay(10)));
    }
}