简介

该事务框架提供tcc和mtx两种处理方式。
以下例子会使用demo(调用方)和sample(被调用方)两个项目进行说明,两个项目都是标准的骨架项目分别包含api、model、biz、client、provider等几个主要层级,且业务代码位于biz,接口代码位于api和client中,接口实现代码位于provider项目中。

tcc

tcc简介

TCC是Try-Confirm-Cancel的简称:

Try阶段:

完成所有业务检查(一致性),预留业务资源(准隔离性)。

Confirm阶段:

确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。

Cancel阶段:

取消Try阶段预留的业务资源。如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。

依赖

添加事务框架依赖,biz需要添加事务框架的声明包,provider需要添加事务框架的实现包。

pom.xml(demo-biz/sample-biz)

<dependency>
    <groupId>com.lc.tx.tcc</groupId>
    <artifactId>lc-tcc-annotation</artifactId>
</dependency>

pom.xml(demo-provider/sample-provider/sample-client)

<dependency>
    <groupId>com.lc.tx.tcc.springboot.starter</groupId>
    <artifactId>lc-tcc-springboot-starter-springcloud</artifactId>
</dependency>

业务调用代码

调用方

  1. demo需要在业务层代码添加三个方法,分别对应tcc的三个阶段,主体try方法添加Tcc注解,并通过属性注明confirmMethod和cancelMethod对应的方法,另外还需要添加Transactional事务注解。
    如:DefaultDemoServiceLocal.java
@Service
public class DefaultDemoServiceLocal implements IDemoServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDemoServiceLocal.class);

    @Resource
    private DemoRepository demoRepository;
    @Resource
    private ISampleClientTcc sampleClientTcc;

    @Tcc(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    @Transactional
    @Override
    public String tccSave(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.tccSave(DemoPo)");

        //TODO 预留资源
        LOGGER.info("===================> 预留资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        demoPo.setDataStatus(UserStatus.INACTIVE.getValue());
        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();

        if(7L == demoPo.getSum()) {
            throw new BaseException("Tcc发起方异常");
        }

        SamplePo samplePo = BeanUtils.copyNotNullProperties(SamplePo.class, demoPo);
        APIResult<Void> result = sampleClientTcc.tccSave(samplePo);
        if(null == result || result.isFailed()) {
            throw new BaseException("Tcc参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String confirmMethod(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.confirmMethod(DemoPo)");

        //TODO 确认资源
        LOGGER.info("===================> 确认资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        demoPo.setDataStatus(UserStatus.ACTIVED.getValue());
        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String cancelMethod(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.cancelMethod(DemoPo)");

        //TODO 释放资源
        LOGGER.info("===================> 释放资源");

        Demo demo = demoRepository.newInstance(demoPo);
        demo.delete();

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return null;
    }

}
  1. provider项目中新增配置文件application-tcc.yml,并application.yml中的spring.profiles.include属性添加tcc
    com:
    lc :
     tx :
       tcc:
         enabled : true
         serializer : kryo
         repositorySupport : db
         txDbConfig :
                  driverClassName  : com.mysql.jdbc.Driver
                  url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                  username : root
                  password : 1234

被调用方

  1. sample的ISampleService类新增接口tccSave方法;
@Validated
@RequestMapping(value = "/sample")
@RestController
public interface ISampleService {
    @RequestMapping(value = "/tcc/save", method = RequestMethod.POST)
    public APIResult<Void> tccSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);
}
  1. 新增ISampleClientTcc类,tccSave添加Tcc注解,另外需要接口类上的FeignClient注解的configuration属性指定TccFeignConfiguration.class
@FeignClient(name = "ibps-sample-provider", configuration = TccFeignConfiguration.class)
public interface ISampleClientTcc {

    @Tcc
    @RequestMapping(value = "/sample/tcc/save", method = RequestMethod.POST)
    public APIResult<Void> tccSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);

}
  1. 接口实现provider根据业务需要添加tcc注解;

SampleProvider.java

public class SampleProvider extends GenericProvider implements ISampleService {
    @Resource
    private ISampleServiceLocal sampleServiceLocal;

    @Override
    @ApiOperation(value = "示例保存-TCC", notes = "根据传入数据,保存示例")
    public APIResult<Void> tccSave(
            @ApiParam(name = "SamplePo", value = "示例对象", required = true) 
            @RequestBody(required = true)  SamplePo samplePo) {
        APIResult<Void> result = new APIResult<>();
        try {
            samplePo.setName("tcc." + samplePo.getName());
            samplePo.setSubject("tcc." + samplePo.getSubject());
            samplePo.setText1("tcc." + samplePo.getText1());
            samplePo.setText2("tcc." + samplePo.getText2());
            samplePo.setText3("tcc." + samplePo.getText3());
            samplePo.setText4("tcc." + samplePo.getText4());
            samplePo.setText5("tcc." + samplePo.getText5());
            sampleServiceLocal.tccSave(samplePo);
        } catch (Exception e) {
            setExceptionResult(result, StateEnum.ERROR.getCode(), StateEnum.ERROR.getText(), e);
        }

        return result;
    }
}
  1. DefaultSampleServiceLocal.java中tccSave需要添加tcc注解,并注明confirmMethod和cancelMethod,另外还需要添加Transactional事务注解,如果SampleProvider.tccSave()方法中还有另外的逻辑要保证事务如sampleServiceLocal2.tccSave2(),那么tccSave2的方法注释也需要按同样的方式处理;
public class DefaultSampleServiceLocal implements ISampleServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSampleServiceLocal.class);

    @Resource
    private SampleRepository sampleRepository;

    @Tcc(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    @Transactional
    @Override
    public String tccSave(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.tccSave(SamplePo)");

        //TODO 预留资源
        LOGGER.info("===================> 预留资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        samplePo.setDataStatus(UserStatus.INACTIVE.getValue());
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();

        if(70L == samplePo.getSum()) {
            throw new BaseException("Tcc参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String confirmMethod(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.confirmMethod(SamplePo)");

        //TODO 确认资源
        LOGGER.info("===================> 确认资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        samplePo.setDataStatus(UserStatus.ACTIVED.getValue());
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String cancelMethod(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.cancelMethod(SamplePo)");

        //TODO 释放资源
        LOGGER.info("===================> 释放资源");

        Sample sample = sampleRepository.newInstance(samplePo);
        sample.delete();

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return null;
    }
}
  1. provider项目中新增配置文件application-tcc.yml,并application.yml中的spring.profiles.include属性添加tcc
com:
  lc :
    tx :
      tcc:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

mtx

mtx简介

这种实现分布式事务的方式需要通过消息中间件来实现。假设有A和B两个系统,分别可以处理任务A和任务B。此时系统A中存在一个业务流程,需要将任务A和任务B在同一个事务中处理;
框架支持rabbitMQ、activeMQ、aliyunMQ、kafka、rocketMQ等实现,以下例子通过rabbitMQ实现;

依赖

添加事务框架依赖,biz需要添加事务框架的声明包,provider需要添加事务框架的实现包。

pom.xml(demo-biz/sample-biz)

<dependency>
    <groupId>com.lc.tx.mtx</groupId>
    <artifactId>lc-mtx-annotation</artifactId>
</dependency>

pom.xml(sample-client)

<dependency>
    <groupId>com.lc.tx.mtx.springboot.starter</groupId>
    <artifactId>lc-mtx-springboot-starter-springcloud</artifactId>
</dependency>

pom.xml(demo-provider/sample-provider)

<dependency>
    <groupId>com.lc.tx.mtx.springboot.starter</groupId>
    <artifactId>lc-mtx-springboot-starter-springcloud</artifactId>
</dependency>
<dependency>
    <groupId>com.lc.tx.mtx.mq</groupId>
    <artifactId>lc-mtx-rabbitmq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

业务调用代码

调用方

  1. demo-biz项目中DefaultDemoServiceLocal.java添加Mtx和Transactional注解
public class DefaultDemoServiceLocal implements IDemoServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDemoServiceLocal.class);
    @Resource
    private DemoRepository demoRepository;
    @Resource
    private ISampleClientMtx sampleClientMtx;

    @Mtx(destination = "")
    @Transactional
    @Override
    public String mtxSave(DemoPo demoPo) {
        String resultMsg="操作成功";
        int resultMessage = 0;

        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();
        if(37L == demoPo.getSum()) {
            throw new BaseException("Mtx发起方异常");
        }
        SamplePo samplePo = BeanUtils.copyNotNullProperties(SamplePo.class, demoPo);
        APIResult<Void> result = sampleClientMtx.mtxSave(samplePo);
        if(null == result || result.isFailed()) {
            throw new BaseException("Mtx参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        return resultMsg + StringPool.COMMA + resultMessage;
    }

}
  1. provider项目中新增配置文件application-mtx.yml,并application.yml中的spring.profiles.include属性添加mtx
com:
  lc :
    tx :
      mtx:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

被调用方

  1. 添加mq配置类 AmqpConfig.java,注意添加新队列sample和mq工厂类
@Configuration
@ConditionalOnBean(value = MtxConfigProperties.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public class AmqpConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(100);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public Queue employeeQueue() {
        return new Queue("sample");
    }

}
  1. 添加mq消费监听类 RabbitConsumer.java,监听sample队列
@ConditionalOnBean(value = MtxConfigProperties.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
@RabbitListener(queues = "sample",containerFactory = "rabbitContainerFactory")
public class RabbitConsumer {

    private final MtxMqReceiveService mtxMqReceiveService;

    @Autowired
    public RabbitConsumer(MtxMqReceiveService mtxMqReceiveService) {
        this.mtxMqReceiveService = mtxMqReceiveService;
    }

    @RabbitHandler
    public void process(byte[] msg) {
        mtxMqReceiveService.processMessage(msg);
    }

}
  1. 接口类ISampleService.java新增接口mtxSave
@Validated
@RequestMapping(value = "/sample")
@RestController
public interface ISampleService {
    @RequestMapping(value = "/mtx/save", method = RequestMethod.POST)
    public APIResult<Void> mtxSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);

}
  1. 新增接口client类ISampleClientMtx.java,新增mtxSave对应接口类ISampleService的mtxSave方法,并加上注解Mtx,设置监听的队列和接口实现类
@FeignClient(name = "ibps-sample-provider", configuration = MtxFeignConfiguration.class)
public interface ISampleClientMtx {
    @Mtx(destination = "sample", target = ISampleServiceLocal.class)
    @RequestMapping(value = "/sample/mtx/save", method = RequestMethod.POST)
    public APIResult<Void> mtxSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);
}
  1. Sampleprovider实现mtxSave方法
public class SampleProvider extends GenericProvider implements ISampleService {
    @Resource
    private ISampleServiceLocal sampleServiceLocal;

    @Override
    @ApiOperation(value = "示例保存-MTX", notes = "根据传入数据,保存示例")
    public APIResult<Void> mtxSave(
            @ApiParam(name = "SamplePo", value = "示例对象", required = true) 
            @RequestBody(required = true)  SamplePo samplePo) {
        APIResult<Void> result = new APIResult<>();
        try {
            samplePo.setName("mtx." + samplePo.getName());
            samplePo.setSubject("mtx." + samplePo.getSubject());
            samplePo.setText1("mtx." + samplePo.getText1());
            samplePo.setText2("mtx." + samplePo.getText2());
            samplePo.setText3("mtx." + samplePo.getText3());
            samplePo.setText4("mtx." + samplePo.getText4());
            samplePo.setText5("mtx." + samplePo.getText5());
            sampleServiceLocal.mtxSave(samplePo);
        } catch (Exception e) {
            setExceptionResult(result, StateEnum.ERROR.getCode(), StateEnum.ERROR.getText(), e);
        }

        return result;
    }
}
  1. 添加mtxSave方法的业务实现接口类和其实现类ISampleServiceLocal和DefaultSampleServiceLocal类

ISampleServiceLocal.java

public interface ISampleServiceLocal {
    public String mtxSave(SamplePo samplePo);
}

DefaultSampleServiceLocal.java,mtxSave方法添加MTx注解,并设置监听sample队列

@Service
public class DefaultSampleServiceLocal implements ISampleServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSampleServiceLocal.class);

    @Resource
    private SampleRepository sampleRepository;

    @Mtx(destination = "sample")
    @Transactional
    @Override
    public String mtxSave(SamplePo samplePo) {
        String resultMsg="操作成功";
        int resultMessage = 0;
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();
        if(370L == samplePo.getSum()) {
            throw new BaseException("Mtx参与方异常");
        }
        resultMessage = ResultMessage.SUCCESS;
        return resultMsg + StringPool.COMMA + resultMessage;
    }

}
  1. provider项目中新增配置文件application-mtx.yml,并application.yml中的spring.profiles.include属性添加mtx
com:
  lc :
    tx :
      mtx:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

mq配置

除了以上业务代码外,demo(调用方)和sample(被调用方)都还需要配置rabbitMQ的信息,配置如下:

spring:
  #============== rabbitmq ===================
  rabbitmq:
    host: 192.168.3.210
    port: 5672
    addresses: 
    username: ibps
    password: ibps
    virtual-host: /ibps
文档更新时间: 2021-02-01 10:07   作者:Eddy