作者:Eddy  历史版本:1  更新时间:2024-11-20 15:41

简介

自研事务框架提供tcc和mtx两种处理方式。
以下例子会使用demo(调用方)和sample(被调用方)两个项目进行说明,两个项目都是标准的骨架项目分别包含api、model、biz、client、provider等几个主要层级,且业务代码位于biz,接口代码位于api和client中,接口实现代码位于provider项目中。

seata

seata简介

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,其内部版本在阿里系内部一直扮演着应用架构层数据一致性的中间件角色,帮助经济体平稳的度过历年的双11,对上层业务进行了有力的技术支撑。经过多年沉淀与积累,其商业化产品先后在阿里云、金融云上售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助用户快速落地分布式事务解决方案。

seata集成

https://seata.io/zh-cn/docs/overview/what-is-seata.html

tcc

tcc简介

TCC是Try-Confirm-Cancel的简称:

Try阶段:

完成所有业务检查(一致性),预留业务资源(准隔离性)。

Confirm阶段:

确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。

Cancel阶段:

取消Try阶段预留的业务资源。如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。

依赖

添加事务框架依赖,biz需要添加事务框架的声明包,provider需要添加事务框架的实现包。

pom.xml(demo-biz/sample-biz)

<dependency>
    <groupId>com.lc.tx.tcc</groupId>
    <artifactId>lc-tcc-annotation</artifactId>
</dependency>

pom.xml(demo-provider/sample-provider/sample-client)

<dependency>
    <groupId>com.lc.tx.tcc.springboot.starter</groupId>
    <artifactId>lc-tcc-springboot-starter-springcloud</artifactId>
</dependency>

业务调用代码

调用方

  1. demo需要在业务层代码添加三个方法,分别对应tcc的三个阶段,主体try方法添加Tcc注解,并通过属性注明confirmMethod和cancelMethod对应的方法,另外还需要添加Transactional事务注解。
    如:DefaultDemoServiceLocal.java
@Service
public class DefaultDemoServiceLocal implements IDemoServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDemoServiceLocal.class);

    @Resource
    private DemoRepository demoRepository;
    @Resource
    private ISampleClientTcc sampleClientTcc;

    @Tcc(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    @Transactional
    @Override
    public String tccSave(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.tccSave(DemoPo)");

        //TODO 预留资源
        LOGGER.info("===================> 预留资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        demoPo.setDataStatus(UserStatus.INACTIVE.getValue());
        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();

        if(7L == demoPo.getSum()) {
            throw new BaseException("Tcc发起方异常");
        }

        SamplePo samplePo = BeanUtils.copyNotNullProperties(SamplePo.class, demoPo);
        APIResult<Void> result = sampleClientTcc.tccSave(samplePo);
        if(null == result || result.isFailed()) {
            throw new BaseException("Tcc参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String confirmMethod(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.confirmMethod(DemoPo)");

        //TODO 确认资源
        LOGGER.info("===================> 确认资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        demoPo.setDataStatus(UserStatus.ACTIVED.getValue());
        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String cancelMethod(DemoPo demoPo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.demo.service.DefaultDemoServiceLocal.cancelMethod(DemoPo)");

        //TODO 释放资源
        LOGGER.info("===================> 释放资源");

        Demo demo = demoRepository.newInstance(demoPo);
        demo.delete();

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return null;
    }

}
  1. provider项目中新增配置文件application-tcc.yml,并application.yml中的spring.profiles.include属性添加tcc
    com:
    lc :
     tx :
       tcc:
         enabled : true
         serializer : kryo
         repositorySupport : db
         txDbConfig :
                  driverClassName  : com.mysql.jdbc.Driver
                  url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                  username : root
                  password : 1234

被调用方

  1. sample的ISampleService类新增接口tccSave方法;
@Validated
@RequestMapping(value = "/sample")
@RestController
public interface ISampleService {
    @RequestMapping(value = "/tcc/save", method = RequestMethod.POST)
    public APIResult<Void> tccSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);
}
  1. 新增ISampleClientTcc类,tccSave添加Tcc注解,另外需要接口类上的FeignClient注解的configuration属性指定TccFeignConfiguration.class
@FeignClient(name = "ibps-sample-provider", configuration = TccFeignConfiguration.class)
public interface ISampleClientTcc {

    @Tcc
    @RequestMapping(value = "/sample/tcc/save", method = RequestMethod.POST)
    public APIResult<Void> tccSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);

}
  1. 接口实现provider根据业务需要添加tcc注解;

SampleProvider.java

public class SampleProvider extends GenericProvider implements ISampleService {
    @Resource
    private ISampleServiceLocal sampleServiceLocal;

    @Override
    @ApiOperation(value = "示例保存-TCC", notes = "根据传入数据,保存示例")
    public APIResult<Void> tccSave(
            @ApiParam(name = "SamplePo", value = "示例对象", required = true) 
            @RequestBody(required = true)  SamplePo samplePo) {
        APIResult<Void> result = new APIResult<>();
        try {
            samplePo.setName("tcc." + samplePo.getName());
            samplePo.setSubject("tcc." + samplePo.getSubject());
            samplePo.setText1("tcc." + samplePo.getText1());
            samplePo.setText2("tcc." + samplePo.getText2());
            samplePo.setText3("tcc." + samplePo.getText3());
            samplePo.setText4("tcc." + samplePo.getText4());
            samplePo.setText5("tcc." + samplePo.getText5());
            sampleServiceLocal.tccSave(samplePo);
        } catch (Exception e) {
            setExceptionResult(result, StateEnum.ERROR.getCode(), StateEnum.ERROR.getText(), e);
        }

        return result;
    }
}
  1. DefaultSampleServiceLocal.java中tccSave需要添加tcc注解,并注明confirmMethod和cancelMethod,另外还需要添加Transactional事务注解,如果SampleProvider.tccSave()方法中还有另外的逻辑要保证事务如sampleServiceLocal2.tccSave2(),那么tccSave2的方法注释也需要按同样的方式处理;
public class DefaultSampleServiceLocal implements ISampleServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSampleServiceLocal.class);

    @Resource
    private SampleRepository sampleRepository;

    @Tcc(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    @Transactional
    @Override
    public String tccSave(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.tccSave(SamplePo)");

        //TODO 预留资源
        LOGGER.info("===================> 预留资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        samplePo.setDataStatus(UserStatus.INACTIVE.getValue());
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();

        if(70L == samplePo.getSum()) {
            throw new BaseException("Tcc参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String confirmMethod(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.confirmMethod(SamplePo)");

        //TODO 确认资源
        LOGGER.info("===================> 确认资源");

        String resultMsg="操作成功";
        int resultMessage = 0;

        samplePo.setDataStatus(UserStatus.ACTIVED.getValue());
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();

        resultMessage = ResultMessage.SUCCESS;

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return resultMsg + StringPool.COMMA + resultMessage;
    }

    public String cancelMethod(SamplePo samplePo) {
        StopWatch watch = new StopWatch();
        watch.start("com.lc.ibps.sample.service.DefaultSampleServiceLocal.cancelMethod(SamplePo)");

        //TODO 释放资源
        LOGGER.info("===================> 释放资源");

        Sample sample = sampleRepository.newInstance(samplePo);
        sample.delete();

        watch.stop();
        LOGGER.info("{} used {} ms.", watch.getLastTaskName(), watch.getLastTaskTimeMillis());

        return null;
    }
}
  1. provider项目中新增配置文件application-tcc.yml,并application.yml中的spring.profiles.include属性添加tcc
com:
  lc :
    tx :
      tcc:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

mtx

mtx简介

这种实现分布式事务的方式需要通过消息中间件来实现。假设有A和B两个系统,分别可以处理任务A和任务B。此时系统A中存在一个业务流程,需要将任务A和任务B在同一个事务中处理;
框架支持rabbitMQ、activeMQ、aliyunMQ、kafka、rocketMQ等实现,以下例子通过rabbitMQ实现;

依赖

添加事务框架依赖,biz需要添加事务框架的声明包,provider需要添加事务框架的实现包。

pom.xml(demo-biz/sample-biz)

<dependency>
    <groupId>com.lc.tx.mtx</groupId>
    <artifactId>lc-mtx-annotation</artifactId>
</dependency>

pom.xml(sample-client)

<dependency>
    <groupId>com.lc.tx.mtx.springboot.starter</groupId>
    <artifactId>lc-mtx-springboot-starter-springcloud</artifactId>
</dependency>

pom.xml(demo-provider/sample-provider)

<dependency>
    <groupId>com.lc.tx.mtx.springboot.starter</groupId>
    <artifactId>lc-mtx-springboot-starter-springcloud</artifactId>
</dependency>
<dependency>
    <groupId>com.lc.tx.mtx.mq</groupId>
    <artifactId>lc-mtx-rabbitmq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

业务调用代码

调用方

  1. demo-biz项目中DefaultDemoServiceLocal.java添加Mtx和Transactional注解
public class DefaultDemoServiceLocal implements IDemoServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDemoServiceLocal.class);
    @Resource
    private DemoRepository demoRepository;
    @Resource
    private ISampleClientMtx sampleClientMtx;

    @Mtx(destination = "")
    @Transactional
    @Override
    public String mtxSave(DemoPo demoPo) {
        String resultMsg="操作成功";
        int resultMessage = 0;

        Demo demo = demoRepository.newInstance(demoPo);
        demo.save();
        if(37L == demoPo.getSum()) {
            throw new BaseException("Mtx发起方异常");
        }
        SamplePo samplePo = BeanUtils.copyNotNullProperties(SamplePo.class, demoPo);
        APIResult<Void> result = sampleClientMtx.mtxSave(samplePo);
        if(null == result || result.isFailed()) {
            throw new BaseException("Mtx参与方异常");
        }

        resultMessage = ResultMessage.SUCCESS;

        return resultMsg + StringPool.COMMA + resultMessage;
    }

}
  1. provider项目中新增配置文件application-mtx.yml,并application.yml中的spring.profiles.include属性添加mtx
com:
  lc :
    tx :
      mtx:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

被调用方

  1. 添加mq配置类 AmqpConfig.java,注意添加新队列sample和mq工厂类
@Configuration
@ConditionalOnBean(value = MtxConfigProperties.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public class AmqpConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(100);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public Queue employeeQueue() {
        return new Queue("sample");
    }

}
  1. 添加mq消费监听类 RabbitConsumer.java,监听sample队列
@ConditionalOnBean(value = MtxConfigProperties.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
@RabbitListener(queues = "sample",containerFactory = "rabbitContainerFactory")
public class RabbitConsumer {

    private final MtxMqReceiveService mtxMqReceiveService;

    @Autowired
    public RabbitConsumer(MtxMqReceiveService mtxMqReceiveService) {
        this.mtxMqReceiveService = mtxMqReceiveService;
    }

    @RabbitHandler
    public void process(byte[] msg) {
        mtxMqReceiveService.processMessage(msg);
    }

}
  1. 接口类ISampleService.java新增接口mtxSave
@Validated
@RequestMapping(value = "/sample")
@RestController
public interface ISampleService {
    @RequestMapping(value = "/mtx/save", method = RequestMethod.POST)
    public APIResult<Void> mtxSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);

}
  1. 新增接口client类ISampleClientMtx.java,新增mtxSave对应接口类ISampleService的mtxSave方法,并加上注解Mtx,设置监听的队列和接口实现类
@FeignClient(name = "ibps-sample-provider", configuration = MtxFeignConfiguration.class)
public interface ISampleClientMtx {
    @Mtx(destination = "sample", target = ISampleServiceLocal.class)
    @RequestMapping(value = "/sample/mtx/save", method = RequestMethod.POST)
    public APIResult<Void> mtxSave(
            @RequestBody(required = true) @Valid SamplePo samplePo);
}
  1. Sampleprovider实现mtxSave方法
public class SampleProvider extends GenericProvider implements ISampleService {
    @Resource
    private ISampleServiceLocal sampleServiceLocal;

    @Override
    @ApiOperation(value = "示例保存-MTX", notes = "根据传入数据,保存示例")
    public APIResult<Void> mtxSave(
            @ApiParam(name = "SamplePo", value = "示例对象", required = true) 
            @RequestBody(required = true)  SamplePo samplePo) {
        APIResult<Void> result = new APIResult<>();
        try {
            samplePo.setName("mtx." + samplePo.getName());
            samplePo.setSubject("mtx." + samplePo.getSubject());
            samplePo.setText1("mtx." + samplePo.getText1());
            samplePo.setText2("mtx." + samplePo.getText2());
            samplePo.setText3("mtx." + samplePo.getText3());
            samplePo.setText4("mtx." + samplePo.getText4());
            samplePo.setText5("mtx." + samplePo.getText5());
            sampleServiceLocal.mtxSave(samplePo);
        } catch (Exception e) {
            setExceptionResult(result, StateEnum.ERROR.getCode(), StateEnum.ERROR.getText(), e);
        }

        return result;
    }
}
  1. 添加mtxSave方法的业务实现接口类和其实现类ISampleServiceLocal和DefaultSampleServiceLocal类

ISampleServiceLocal.java

public interface ISampleServiceLocal {
    public String mtxSave(SamplePo samplePo);
}

DefaultSampleServiceLocal.java,mtxSave方法添加MTx注解,并设置监听sample队列

@Service
public class DefaultSampleServiceLocal implements ISampleServiceLocal {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSampleServiceLocal.class);

    @Resource
    private SampleRepository sampleRepository;

    @Mtx(destination = "sample")
    @Transactional
    @Override
    public String mtxSave(SamplePo samplePo) {
        String resultMsg="操作成功";
        int resultMessage = 0;
        Sample sample = sampleRepository.newInstance(samplePo);
        sample.save();
        if(370L == samplePo.getSum()) {
            throw new BaseException("Mtx参与方异常");
        }
        resultMessage = ResultMessage.SUCCESS;
        return resultMsg + StringPool.COMMA + resultMessage;
    }

}
  1. provider项目中新增配置文件application-mtx.yml,并application.yml中的spring.profiles.include属性添加mtx
com:
  lc :
    tx :
      mtx:
        enabled : true
        serializer : kryo
        repositorySupport : db
        txDbConfig :
                 driverClassName  : com.mysql.jdbc.Driver
                 url :  jdbc:mysql://192.168.3.210:3306/tx?characterEncoding=utf-8&amp;useUnicode=true
                 username : root
                 password : 1234

mq配置

除了以上业务代码外,demo(调用方)和sample(被调用方)都还需要配置rabbitMQ的信息,配置如下:

spring:
  #============== rabbitmq ===================
  rabbitmq:
    host: 192.168.3.210
    port: 5672
    addresses: 
    username: ibps
    password: ibps
    virtual-host: /ibps