1、与 solon 框架事务整合
@Component
public class UserService {
@Inject
MqClient client;
@Inject
UserDao userDao;
@Tran
public void addUser(long userId, String userName) throws Exception{
//1.创建用户
userDao.createUser(userId, userName);
//2.发布事件
MqTransaction mqTran = client.newTransaction();
//通过框架的事务监听器,与框架事务整合(这里可封装工具,简化代码)
TranUtils.listen(new TranListener() {
@Override
public void beforeCommit(boolean readOnly) throws Throwable {
mqTran.commit(); //提交
}
@Override
public void afterCompletion(int status) {
if (status == TranListener.STATUS_ROLLED_BACK) {
RunUtil.runAndTry(mqTran::rollback); //回滚
}
}
});
String userIdStr = String.valueOf(userId);
//发布注册事件
client.publish("user.registered", new MqMessage("").attr("userId",userIdStr).transaction(mqTran));
//发布唤醒事件(10天后执行)
client.publish("user.reawakened", new MqMessage("").attr("userId",userIdStr).transaction(mqTran).scheduled(DateTime.Now().addDay(10)));
}
}
2、与 springboot 框架事务整合
@Service
public class UserService {
@Autowired
MqClient client;
@Autowired
UserDao userDao;
@Transactional
public void addUser(long userId, String userName) throws Exception{
//1.创建用户
userDao.createUser(userId, userName);
//2.发布事件
MqTransaction mqTran = client.newTransaction();
//通过框架的事务监听器,与框架事务整合(这里可封装工具,简化代码)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void beforeCommit(boolean readOnly) {
mqTran.commit(); //提交
}
@Override
public void afterCompletion(int status) {
if(TransactionSynchronization.STATUS_ROLLED_BACK == status){
mqTran.rollback(); //回滚
}
}
});
String userIdStr = String.valueOf(userId);
//发布注册事件
client.publish("user.registered", new MqMessage("").attr("userId",userIdStr).transaction(mqTran));
//发布唤醒事件(10天后执行)
client.publish("user.reawakened", new MqMessage("").attr("userId",userIdStr).transaction(mqTran).scheduled(DateTime.Now().addDay(10)));
}
}