Java学习指南
  • Java 编程的逻辑
  • Java进阶
  • Java FrameWorks
  • 了解 USB Type-A,B,C 三大标准接口
  • 深入浅出DDD
  • 重构:改善既有代码的设计
  • 面试大纲
  • 云原生
    • 什么是无服务器(what is serverless)?
  • 博客
    • 深入分析Log4j 漏洞
  • 博客
    • Serverless之快速搭建Spring Boot应用
  • 博客
    • 使用 Prometheus + Grafana + Spring Boot Actuator 监控应用
  • 博客
    • 使用 Prometheus + Grafana 监控 MySQL
  • 博客
    • 使用Github Actions + Docker 部署Spring Boot应用
  • 博客
    • Redis分布式锁之Redisson的原理和实践
  • 博客
    • 数据库中的树结构应该怎样去设计
  • 学习&成长
    • 如何成为技术大牛
  • 开发工具
    • Git Commit Message Guidelines
  • 开发工具
    • git命名大全
  • 开发工具
    • Gradle vs Maven Comparison
  • 开发工具
    • Swagger2常用注解及其说明
  • 开发工具
    • 简明 VIM 练级攻略
  • 微服务
    • 十大微服务设计模式和原则
  • 微服务
    • 微服务下的身份认证和令牌管理
  • 微服务
    • 微服务坏味道之循环依赖
  • 设计模式
    • 设计模式 - JDK中的设计模式
  • 设计模式
    • 设计模式 - Java三种代理模式
  • 设计模式
    • 设计模式 - 六大设计原则
  • 设计模式
    • 设计模式 - 单例模式
  • 设计模式
    • 设计模式 - 命名模式
  • 设计模式
    • 设计模式 - 备忘录模式
  • 设计模式
    • 设计模式 - 概览
  • 设计模式
    • 设计模式 - 没用的设计模式
  • 质量&效率
    • Homebrew 替换国内镜像源
  • 质量&效率
    • 工作中如何做好技术积累
  • Java FrameWorks
    • Logback
      • 自定义 logback 日志过滤器
  • Java FrameWorks
    • Mybatis
      • MyBatis(十三) - 整合Spring
  • Java FrameWorks
    • Mybatis
      • MyBatis(十二) - 一些API
  • Java FrameWorks
    • Mybatis
      • Mybatis(一) - 概述
  • Java FrameWorks
    • Mybatis
      • Mybatis(七) - 结果集的封装与映射
  • Java FrameWorks
    • Mybatis
      • Mybatis(三) - mapper.xml及其加载机制
  • Java FrameWorks
    • Mybatis
      • Mybatis(九) - 事务
  • Java FrameWorks
    • Mybatis
      • Mybatis(二) - 全局配置文件及其加载机制
  • Java FrameWorks
    • Mybatis
      • Mybatis(五) - SqlSession执行流程
  • Java FrameWorks
    • Mybatis
      • Mybatis(八) - 缓存
  • Java FrameWorks
    • Mybatis
      • Mybatis(六) - 动态SQL的参数绑定与执行
  • Java FrameWorks
    • Mybatis
      • Mybatis(十) - 插件
  • Java FrameWorks
    • Mybatis
      • Mybatis(十一) - 日志
  • Java FrameWorks
    • Mybatis
      • Mybatis(四) - Mapper接口解析
  • Java FrameWorks
    • Netty
      • Netty 可靠性分析
  • Java FrameWorks
    • Netty
      • Netty - Netty 线程模型
  • Java FrameWorks
    • Netty
      • Netty堆外内存泄露排查盛宴
  • Java FrameWorks
    • Netty
      • Netty高级 - 高性能之道
  • Java FrameWorks
    • Shiro
      • Shiro + JWT + Spring Boot Restful 简易教程
  • Java FrameWorks
    • Shiro
      • 非常详尽的 Shiro 架构解析!
  • Java FrameWorks
    • Spring
      • Spring AOP 使用介绍,从前世到今生
  • Java FrameWorks
    • Spring
      • Spring AOP 源码解析
  • Java FrameWorks
    • Spring
      • Spring Event 实现原理
  • Java FrameWorks
    • Spring
      • Spring Events
  • Java FrameWorks
    • Spring
      • Spring IOC容器源码分析
  • Java FrameWorks
    • Spring
      • Spring Integration简介
  • Java FrameWorks
    • Spring
      • Spring MVC 框架中拦截器 Interceptor 的使用方法
  • Java FrameWorks
    • Spring
      • Spring bean 解析、注册、实例化流程源码剖析
  • Java FrameWorks
    • Spring
      • Spring validation中@NotNull、@NotEmpty、@NotBlank的区别
  • Java FrameWorks
    • Spring
      • Spring 如何解决循环依赖?
  • Java FrameWorks
    • Spring
      • Spring 异步实现原理与实战分享
  • Java FrameWorks
    • Spring
      • Spring中的“for update”问题
  • Java FrameWorks
    • Spring
      • Spring中的设计模式
  • Java FrameWorks
    • Spring
      • Spring事务失效的 8 大原因
  • Java FrameWorks
    • Spring
      • Spring事务管理详解
  • Java FrameWorks
    • Spring
      • Spring计时器StopWatch使用
  • Java FrameWorks
    • Spring
      • 详述 Spring MVC 框架中拦截器 Interceptor 的使用方法
  • Java FrameWorks
    • Spring
      • 透彻的掌握 Spring 中@transactional 的使用
  • Java
    • Java IO&NIO&AIO
      • Java IO - BIO 详解
  • Java
    • Java IO&NIO&AIO
      • Java NIO - IO多路复用详解
  • Java
    • Java IO&NIO&AIO
      • Java N(A)IO - Netty
  • Java
    • Java IO&NIO&AIO
      • Java IO - Unix IO模型
  • Java
    • Java IO&NIO&AIO
      • Java IO - 分类
  • Java
    • Java IO&NIO&AIO
      • Java NIO - 基础详解
  • Java
    • Java IO&NIO&AIO
      • Java IO - 常见类使用
  • Java
    • Java IO&NIO&AIO
      • Java AIO - 异步IO详解
  • Java
    • Java IO&NIO&AIO
      • Java IO概述
  • Java
    • Java IO&NIO&AIO
      • Java IO - 设计模式
  • Java
    • Java IO&NIO&AIO
      • Java NIO - 零拷贝实现
  • Java
    • Java JVM
      • JVM 优化经验总结
  • Java
    • Java JVM
      • JVM 内存结构
  • Java
    • Java JVM
      • JVM参数设置
  • Java
    • Java JVM
      • Java 内存模型
  • Java
    • Java JVM
      • 从实际案例聊聊Java应用的GC优化
  • Java
    • Java JVM
      • Java 垃圾回收器G1详解
  • Java
    • Java JVM
      • 垃圾回收器Shenandoah GC详解
  • Java
    • Java JVM
      • 垃圾回收器ZGC详解
  • Java
    • Java JVM
      • 垃圾回收基础
  • Java
    • Java JVM
      • 如何优化Java GC
  • Java
    • Java JVM
      • 类加载机制
  • Java
    • Java JVM
      • 类字节码详解
  • Java
    • Java 基础
      • Java hashCode() 和 equals()
  • Java
    • Java 基础
      • Java 基础 - Java native方法以及JNI实践
  • Java
    • Java 基础
      • Java serialVersionUID 有什么作用?
  • Java
    • Java 基础
      • Java 泛型的类型擦除
  • Java
    • Java 基础
      • Java 基础 - Unsafe类解析
  • Java
    • Java 基础
      • Difference Between Statement and PreparedStatement
  • Java
    • Java 基础
      • Java 基础 - SPI机制详解
  • Java
    • Java 基础
      • Java 基础 - final
  • Java
    • Java 基础
      • Java中static关键字详解
  • Java
    • Java 基础
      • 为什么说Java中只有值传递?
  • Java
    • Java 基础
      • Java 基础 - 即时编译器原理解析及实践
  • Java
    • Java 基础
      • Java 基础 - 反射
  • Java
    • Java 基础
      • Java多态的面试题
  • Java
    • Java 基础
      • Java 基础 - 异常机制详解
  • Java
    • Java 基础
      • 为什么要有抽象类?
  • Java
    • Java 基础
      • 接口的本质
  • Java
    • Java 基础
      • Java 基础 - 枚举
  • Java
    • Java 基础
      • Java 基础 - 泛型机制详解
  • Java
    • Java 基础
      • Java 基础 - 注解机制详解
  • Java
    • Java 基础
      • 为什么 String hashCode 方法选择数字31作为乘子
  • Java
    • Java 并发
      • Java 并发 - 14个Java并发容器
  • Java
    • Java 并发
      • Java 并发 - AQS
  • Java
    • Java 并发
      • Java 并发 - BlockingQueue
  • Java
    • Java 并发
      • Java 并发 - CAS
  • Java
    • Java 并发
      • Java 并发 - Condition接口
  • Java
    • Java 并发
      • Java 并发 - CopyOnWriteArrayList
  • Java
    • Java 并发
      • Java 并发 - CountDownLatch、CyclicBarrier和Phaser对比
  • Java
    • Java 并发
      • Java 并发 - Fork&Join框架
  • Java
    • Java 并发
      • Java 并发 - Java CompletableFuture 详解
  • Java
    • Java 并发
      • Java 并发 - Java 线程池
  • Java
    • Java 并发
      • Java 并发 - Lock接口
  • Java
    • Java 并发
      • Java 并发 - ReentrantLock
  • Java
    • Java 并发
      • Java 并发 - ReentrantReadWriteLock
  • Java
    • Java 并发
      • Java 并发 - Synchronized
  • Java
    • Java 并发
      • Java 并发 - ThreadLocal 内存泄漏问题
  • Java
    • Java 并发
      • Java 并发 - ThreadLocal
  • Java
    • Java 并发
      • Java 并发 - Volatile
  • Java
    • Java 并发
      • Java 并发 - 从ReentrantLock的实现看AQS的原理及应用
  • Java
    • Java 并发
      • Java 并发 - 公平锁和非公平锁
  • Java
    • Java 并发
      • Java 并发 - 内存模型
  • Java
    • Java 并发
      • Java 并发 - 原子类
  • Java
    • Java 并发
      • Java 并发 - 如何确保三个线程顺序执行?
  • Java
    • Java 并发
      • Java 并发 - 锁
  • Java
    • Java 的新特性
      • Java 10 新特性概述
  • Java
    • Java 的新特性
      • Java 11 新特性概述
  • Java
    • Java 的新特性
      • Java 12 新特性概述
  • Java
    • Java 的新特性
      • Java 13 新特性概述
  • Java
    • Java 的新特性
      • Java 14 新特性概述
  • Java
    • Java 的新特性
      • Java 15 新特性概述
  • Java
    • Java 的新特性
      • Java 8的新特性
  • Java
    • Java 的新特性
      • Java 9 新特性概述
  • Java
    • Java 调试排错
      • 调试排错 - Java Debug Interface(JDI)详解
  • Java
    • Java 调试排错
      • 调试排错 - CPU 100% 排查优化实践
  • Java
    • Java 调试排错
      • 调试排错 - Java Heap Dump分析
  • Java
    • Java 调试排错
      • 调试排错 - Java Thread Dump分析
  • Java
    • Java 调试排错
      • 调试排错 - Java动态调试技术原理
  • Java
    • Java 调试排错
      • 调试排错 - Java应用在线调试Arthas
  • Java
    • Java 调试排错
      • 调试排错 - Java问题排查:工具单
  • Java
    • Java 调试排错
      • 调试排错 - 内存溢出与内存泄漏
  • Java
    • Java 调试排错
      • 调试排错 - 在线分析GC日志的网站GCeasy
  • Java
    • Java 调试排错
      • 调试排错 - 常见的GC问题分析与解决
  • Java
    • Java 集合
      • Java 集合 - ArrayList
  • Java
    • Java 集合
      • Java 集合 - HashMap 和 ConcurrentHashMap
  • Java
    • Java 集合
      • Java 集合 - HashMap的死循环问题
  • Java
    • Java 集合
      • Java 集合 - LinkedHashSet&Map
  • Java
    • Java 集合
      • Java 集合 - LinkedList
  • Java
    • Java 集合
      • Java 集合 - PriorityQueue
  • Java
    • Java 集合
      • Java 集合 - Stack & Queue
  • Java
    • Java 集合
      • Java 集合 - TreeSet & TreeMap
  • Java
    • Java 集合
      • Java 集合 - WeakHashMap
  • Java
    • Java 集合
      • Java 集合 - 为什么HashMap的容量是2的幂次方
  • Java
    • Java 集合
      • Java 集合 - 概览
  • Java
    • Java 集合
      • Java 集合 - 高性能队列Disruptor详解
  • 分布式
    • RPC
      • ⭐️RPC - Dubbo&hsf&Spring cloud的区别
  • 分布式
    • RPC
      • ⭐️RPC - Dubbo的架构原理
  • 分布式
    • RPC
      • ⭐️RPC - HSF的原理分析
  • 分布式
    • RPC
      • ⭐️RPC - 你应该知道的RPC原理
  • 分布式
    • RPC
      • ⭐️RPC - 动态代理
  • 分布式
    • RPC
      • 深入理解 RPC 之协议篇
  • 分布式
    • RPC
      • RPC - 序列化和反序列化
  • 分布式
    • RPC
      • ⭐️RPC - 服务注册与发现
  • 分布式
    • RPC
      • RPC - 核心原理
  • 分布式
    • RPC
      • ⭐️RPC - 框架对比
  • 分布式
    • RPC
      • ⭐️RPC - 网络通信
  • 分布式
    • 分布式事务
      • 分布式事务 Seata TCC 模式深度解析
  • 分布式
    • 分布式事务
      • 分布式事务的实现原理
  • 分布式
    • 分布式事务
      • 常用的分布式事务解决方案
  • 分布式
    • 分布式事务
      • 手写实现基于消息队列的分布式事务框架
  • 分布式
    • 分布式算法
      • CAP 定理的含义
  • 分布式
    • 分布式算法
      • Paxos和Raft比较
  • 分布式
    • 分布式算法
      • 分布式一致性与共识算法
  • 分布式
    • 分布式锁
      • ⭐️分布式锁的原理及实现方式
  • 分布式
    • 搜索引擎
      • ElasticSearch与SpringBoot的集成与JPA方法的使用
  • 分布式
    • 搜索引擎
      • 全文搜索引擎 Elasticsearch 入门教程
  • 分布式
    • 搜索引擎
      • 十分钟学会使用 Elasticsearch 优雅搭建自己的搜索系统
  • 分布式
    • 搜索引擎
      • 腾讯万亿级 Elasticsearch 技术解密
  • 分布式
    • 日志系统
      • Grafana Loki 简明教程
  • 分布式
    • 日志系统
      • 分布式系统中如何优雅地追踪日志
  • 分布式
    • 日志系统
      • 如何优雅地记录操作日志?
  • 分布式
    • 日志系统
      • 日志收集组件—Flume、Logstash、Filebeat对比
  • 分布式
    • 日志系统
      • 集中式日志系统 ELK 协议栈详解
  • 分布式
    • 消息队列
      • 消息队列 - Kafka
  • 分布式
    • 消息队列
      • 消息队列 - Kafka、RabbitMQ、RocketMQ等消息中间件的对比
  • 分布式
    • 消息队列
      • 消息队列之 RabbitMQ
  • 分布式
    • 消息队列
      • 消息队列 - 使用docker-compose构建kafka集群
  • 分布式
    • 消息队列
      • 消息队列 - 分布式系统与消息的投递
  • 分布式
    • 消息队列
      • 消息队列 - 如何保证消息的可靠性传输
  • 分布式
    • 消息队列
      • 消息队列 - 如何保证消息的顺序性
  • 分布式
    • 消息队列
      • 消息队列 - 如何保证消息队列的高可用
  • 分布式
    • 消息队列
      • 消息队列 - 消息队列设计精要
  • 分布式
    • 监控系统
      • 深度剖析开源分布式监控CAT
  • 大数据
    • Flink
      • Flink架构与核心组件
  • 微服务
    • Dubbo
      • 基于dubbo的分布式应用中的统一异常处理
  • 微服务
    • Dubbo
      • Vim快捷键
  • 微服务
    • Service Mesh
      • Istio 是什么?
  • 微服务
    • Service Mesh
      • OCTO 2.0:美团基于Service Mesh的服务治理系统详解
  • 微服务
    • Service Mesh
      • Service Mesh是什么?
  • 微服务
    • Service Mesh
      • Spring Cloud向Service Mesh迁移
  • 微服务
    • Service Mesh
      • 数据挖掘算法
  • 微服务
    • Service Mesh
      • Seata Saga 模式
  • 微服务
    • Spring Cloud
      • Seata TCC 模式
  • 微服务
    • Spring Cloud
      • Spring Cloud Config
  • 微服务
    • Spring Cloud
      • Seata AT 模式
  • 微服务
    • Spring Cloud
      • Spring Cloud Gateway
  • 微服务
    • Spring Cloud
      • Spring Cloud OpenFeign 的核心原理
  • 微服务
    • Spring Cloud
      • Seata XA 模式
  • 数据库
    • Database Version Control
      • Liquibase vs. Flyway
  • 数据库
    • Database Version Control
      • Six reasons to version control your database
  • 数据库
    • MySQL
      • How Sharding Works
  • 数据库
    • MySQL
      • MySQL InnoDB中各种SQL语句加锁分析
  • 数据库
    • MySQL
      • MySQL 事务隔离级别和锁
  • 数据库
    • MySQL
      • MySQL 索引性能分析概要
  • 数据库
    • MySQL
      • MySQL 索引设计概要
  • 数据库
    • MySQL
      • MySQL出现Waiting for table metadata lock的原因以及解决方法
  • 数据库
    • MySQL
      • MySQL的Limit性能问题
  • 数据库
    • MySQL
      • MySQL索引优化explain
  • 数据库
    • MySQL
      • MySQL索引背后的数据结构及算法原理
  • 数据库
    • MySQL
      • MySQL行转列、列转行问题
  • 数据库
    • MySQL
      • 一条SQL更新语句是如何执行的?
  • 数据库
    • MySQL
      • 一条SQL查询语句是如何执行的?
  • 数据库
    • MySQL
      • 为什么 MySQL 使用 B+ 树
  • 数据库
    • MySQL
      • 为什么 MySQL 的自增主键不单调也不连续
  • 数据库
    • MySQL
      • 为什么我的MySQL会“抖”一下?
  • 数据库
    • MySQL
      • 为什么数据库不应该使用外键
  • 数据库
    • MySQL
      • 为什么数据库会丢失数据
  • 数据库
    • MySQL
      • 事务的可重复读的能力是怎么实现的?
  • 数据库
    • MySQL
      • 大众点评订单系统分库分表实践
  • 数据库
    • MySQL
      • 如何保证缓存与数据库双写时的数据一致性?
  • 数据库
    • MySQL
      • 浅谈数据库并发控制 - 锁和 MVCC
  • 数据库
    • MySQL
      • 深入浅出MySQL 中事务的实现
  • 数据库
    • MySQL
      • 浅入浅出MySQL 和 InnoDB
  • 数据库
    • PostgreSQL
      • PostgreSQL upsert功能(insert on conflict do)的用法
  • 数据库
    • Redis
      • Redis GEO & 实现原理深度分析
  • 数据库
    • Redis
      • Redis 和 I/O 多路复用
  • 数据库
    • Redis
      • Redis分布式锁
  • 数据库
    • Redis
      • Redis实现分布式锁中的“坑”
  • 数据库
    • Redis
      • Redis总结
  • 数据库
    • Redis
      • 史上最全Redis高可用技术解决方案大全
  • 数据库
    • Redis
      • Redlock:Redis分布式锁最牛逼的实现
  • 数据库
    • Redis
      • 为什么 Redis 选择单线程模型
  • 数据库
    • TiDB
      • 新一代数据库TiDB在美团的实践
  • 数据库
    • 数据仓库
      • 实时数仓在有赞的实践
  • 数据库
    • 数据库原理
      • OLTP与OLAP的关系是什么?
  • 数据库
    • 数据库原理
      • 为什么 OLAP 需要列式存储
  • 系统设计
    • DDD
      • Domain Primitive
  • 系统设计
    • DDD
      • Repository模式
  • 系统设计
    • DDD
      • 应用架构
  • 系统设计
    • DDD
      • 聊聊如何避免写流水账代码
  • 系统设计
    • DDD
      • 领域层设计规范
  • 系统设计
    • DDD
      • 从三明治到六边形
  • 系统设计
    • DDD
      • 阿里盒马领域驱动设计实践
  • 系统设计
    • DDD
      • 领域驱动设计(DDD)编码实践
  • 系统设计
    • DDD
      • 领域驱动设计在互联网业务开发中的实践
  • 系统设计
    • 基础架构
      • 容错,高可用和灾备
  • 系统设计
    • 数据聚合
      • GraphQL及元数据驱动架构在后端BFF中的实践
  • 系统设计
    • 数据聚合
      • 高效研发-闲鱼在数据聚合上的探索与实践
  • 系统设计
    • 服务安全
      • JSON Web Token 入门教程
  • 系统设计
    • 服务安全
      • 你还在用JWT做身份认证嘛?
  • 系统设计
    • 服务安全
      • 凭证(Credentials)
  • 系统设计
    • 服务安全
      • 授权(Authorization)
  • 系统设计
    • 服务安全
      • 理解OAuth2.0
  • 系统设计
    • 服务安全
      • 认证(Authentication)
  • 系统设计
    • 架构案例
      • 微信 Android 客户端架构演进之路
  • 系统设计
    • 高可用架构
      • 业务高可用的保障:异地多活架构
  • 计算机基础
    • 字符编码
      • Base64原理解析
  • 计算机基础
    • 字符编码
      • 字符编码笔记:ASCII,Unicode 和 UTF-8
  • 计算机基础
    • 操作系统
      • 为什么 CPU 访问硬盘很慢
  • 计算机基础
    • 操作系统
      • 为什么 HTTPS 需要 7 次握手以及 9 倍时延
  • 计算机基础
    • 操作系统
      • 为什么 Linux 默认页大小是 4KB
  • 计算机基础
    • 操作系统
      • 磁盘IO那些事
  • 计算机基础
    • 操作系统
      • 虚拟机的3种网络模式
  • 计算机基础
    • 服务器
      • mac终端bash、zsh、oh-my-zsh最实用教程
  • 计算机基础
    • 服务器
      • Nginx强制跳转Https
  • 计算机基础
    • 服务器
      • curl 的用法指南
  • 计算机基础
    • 网络安全
      • 如何设计一个安全的对外接口?
  • 计算机基础
    • 网络安全
      • 浅谈常见的七种加密算法及实现
  • 计算机基础
    • 网络编程
      • MQTT - The Standard for IoT Messaging
  • 计算机基础
    • 网络编程
      • 两万字长文 50+ 张趣图带你领悟网络编程的内功心法
  • 计算机基础
    • 网络编程
      • 为什么 TCP 协议有 TIME_WAIT 状态
  • 计算机基础
    • 网络编程
      • 为什么 TCP 协议有性能问题
  • 计算机基础
    • 网络编程
      • 为什么 TCP 协议有粘包问题
  • 计算机基础
    • 网络编程
      • 为什么 TCP 建立连接需要三次握手
  • 计算机基础
    • 网络编程
      • 为什么 TCP/IP 协议会拆分数据
  • 计算机基础
    • 网络编程
      • 使用 OAuth 2 和 JWT 为微服务提供安全保障
  • 计算机基础
    • 网络编程
      • 四种常见的 POST 提交数据方式
  • 计算机基础
    • 网络编程
      • 有赞TCP网络编程最佳实践
  • 计算机基础
    • 网络编程
      • 看完这篇HTTP,跟面试官扯皮就没问题了
  • 计算机基础
    • 网络编程
      • 详细解析 HTTP 与 HTTPS 的区别
  • 质量&效率
    • 快捷键
      • Idea快捷键(Mac版)
  • 质量&效率
    • 快捷键
      • Shell快捷键
  • 质量&效率
    • 快捷键
      • conduit
  • 质量&效率
    • 敏捷开发
      • Scrum的3种角色
  • 质量&效率
    • 敏捷开发
      • Scrum的4种会议
  • 质量&效率
    • 敏捷开发
      • ThoughtWorks的敏捷开发
  • 质量&效率
    • 敏捷开发
      • 敏捷开发入门教程
  • 运维&测试
    • Docker
      • Docker (容器) 的原理
  • 运维&测试
    • Docker
      • Docker Compose:链接外部容器的几种方式
  • 运维&测试
    • Docker
      • Docker 入门教程
  • 运维&测试
    • Docker
      • Docker 核心技术与实现原理
  • 运维&测试
    • Docker
      • Dockerfile 最佳实践
  • 运维&测试
    • Docker
      • Docker开启Remote API 访问 2375端口
  • 运维&测试
    • Docker
      • Watchtower - 自动更新 Docker 镜像与容器
  • 运维&测试
    • Kubernetes
      • Kubernetes 介绍
  • 运维&测试
    • Kubernetes
      • Kubernetes 在有赞的实践
  • 运维&测试
    • Kubernetes
      • Kubernetes 学习路径
  • 运维&测试
    • Kubernetes
      • Kubernetes如何改变美团的云基础设施?
  • 运维&测试
    • Kubernetes
      • Kubernetes的三种外部访问方式:NodePort、LoadBalancer 和 Ingress
  • 运维&测试
    • Kubernetes
      • 谈 Kubernetes 的架构设计与实现原理
  • 运维&测试
    • 压测
      • 全链路压测平台(Quake)在美团中的实践
  • 运维&测试
    • 测试
      • Cpress - JavaScript End to End Testing Framework
  • 运维&测试
    • 测试
      • 代码覆盖率-JaCoCo
  • 运维&测试
    • 测试
      • 浅谈代码覆盖率
  • 运维&测试
    • 测试
      • 测试中 Fakes、Mocks 以及 Stubs 概念明晰
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP中的Bean是如何被AOP代理的
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP原生动态代理和Cglib动态代理
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP实现方式(xml&注解)
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP是如何收集切面类并封装的
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP概述
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP的底层核心后置处理器
  • Java FrameWorks
    • Spring
      • Spring AOP
        • Spring AOP的延伸知识
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - IOC(一)
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - IOC(三)
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - IOC(二)
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - IOC(五)
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - IOC(四) - 循环依赖与解决方案
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot - 启动引导
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot JarLauncher
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot Web Mvc 自动装配
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot 使用ApplicationListener监听器
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot 声明式事务
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot 嵌入式容器
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot引起的“堆外内存泄漏”排查及经验总结
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot的启动流程
  • Java FrameWorks
    • Spring
      • Spring Boot
        • Spring Boot自动化配置源码分析
  • Java FrameWorks
    • Spring
      • Spring Boot
        • 如何自定义Spring Boot Starter?
  • Java FrameWorks
    • Spring
      • Spring IOC
        • IOC - 模块装配和条件装配
  • Java FrameWorks
    • Spring
      • Spring IOC
        • IOC - 配置源(xml,注解)
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Environment
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring ApplicationContext
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring BeanDefinition
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring BeanFactory
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring BeanFactoryPostProcessor
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring BeanPostProcessor
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Bean的生命周期(一) - 概述
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Bean的生命周期(三) - 实例化阶段
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Bean的生命周期(二) - BeanDefinition
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Bean的生命周期(五) - 销毁阶段
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Bean的生命周期(四) - 初始化阶段
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring ComponentScan
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Events
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring IOC 基础篇
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring IOC 总结
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring IOC 进阶篇
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring IOC容器的生命周期
  • Java FrameWorks
    • Spring
      • Spring IOC
        • Spring Resource
  • Java FrameWorks
    • Spring
      • Spring MVC
        • DispatcherServlet的初始化原理
  • Java FrameWorks
    • Spring
      • Spring MVC
        • DispatcherServlet的核心工作原理
  • Java FrameWorks
    • Spring
      • Spring MVC
        • WebMvc的架构设计与组件功能解析
  • Java FrameWorks
    • Spring
      • Spring Security
        • Spring Boot 2 + Spring Security 5 + JWT 的单页应用 Restful 解决方案
  • Java FrameWorks
    • Spring
      • Spring Security
        • Spring Security Oauth
  • Java FrameWorks
    • Spring
      • Spring Security
        • Spring Security
  • Java FrameWorks
    • Spring
      • Spring WebFlux
        • DispatcherHandler的工作原理(传统方式)
  • Java FrameWorks
    • Spring
      • Spring WebFlux
        • DispatcherHandler的工作原理(函数式端点)
  • Java FrameWorks
    • Spring
      • Spring WebFlux
        • WebFlux的自动装配
  • Java FrameWorks
    • Spring
      • Spring WebFlux
        • 快速了解响应式编程与Reactive
  • Java FrameWorks
    • Spring
      • Spring WebFlux
        • 快速使用WebFlux
  • 分布式
    • 协调服务
      • Zookeeper
        • Zookeeper - 客户端之 Curator
  • 分布式
    • 协调服务
      • Zookeeper
        • 详解分布式协调服务 ZooKeeper
  • 分布式
    • 协调服务
      • etcd
        • 高可用分布式存储 etcd 的实现原理
  • 数据库
    • Database Version Control
      • Flyway
        • Database Migrations with Flyway
  • 数据库
    • Database Version Control
      • Flyway
        • How Flyway works
  • 数据库
    • Database Version Control
      • Flyway
        • Rolling Back Migrations with Flyway
  • 数据库
    • Database Version Control
      • Flyway
        • The meaning of the concept of checksums
  • 数据库
    • Database Version Control
      • Liquibase
        • Introduction to Liquibase Rollback
  • 数据库
    • Database Version Control
      • Liquibase
        • LiquiBase中文学习指南
  • 数据库
    • Database Version Control
      • Liquibase
        • Use Liquibase to Safely Evolve Your Database Schema
  • 系统设计
    • 流量控制
      • RateLimiter
        • Guava Rate Limiter实现分析
  • 系统设计
    • 流量控制
      • Sentinel
        • Sentinel 与 Hystrix 的对比
  • 系统设计
    • 流量控制
      • Sentinel
        • Sentinel工作主流程
  • 系统设计
    • 流量控制
      • 算法
        • 分布式服务限流实战
  • 系统设计
    • 解决方案
      • 秒杀系统
        • 如何设计一个秒杀系统
  • 系统设计
    • 解决方案
      • 红包系统
        • 微信高并发资金交易系统设计方案--百亿红包背后的技术支撑
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 什么是预排序遍历树算法(MPTT,Modified Preorder Tree Traversal)
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 加密算法
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 推荐系统算法
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • linkerd
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 查找算法
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 缓存淘汰算法中的LRU和LFU
  • 计算机基础
    • 数据结构与算法
      • 其他相关
        • 负载均衡算法
  • 计算机基础
    • 数据结构与算法
      • 分布式算法
        • 分布式算法 - Paxos算法
  • 计算机基础
    • 数据结构与算法
      • 分布式算法
        • 分布式算法 - Raft算法
  • 计算机基础
    • 数据结构与算法
      • 分布式算法
        • 分布式算法 - Snowflake算法
  • 计算机基础
    • 数据结构与算法
      • 分布式算法
        • 分布式算法 - ZAB算法
  • 计算机基础
    • 数据结构与算法
      • 分布式算法
        • 分布式算法 - 一致性Hash算法
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - Bitmap & Bloom Filter
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - Map & Reduce
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - Trie树/数据库/倒排索引
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - 分治/hash/排序
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - 双层桶划分
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - 外(磁盘文件)排序
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理 - 布隆过滤器
  • 计算机基础
    • 数据结构与算法
      • 大数据处理
        • 大数据处理算法
  • 计算机基础
    • 数据结构与算法
      • 字符串匹配算法
        • 字符串匹配 - 文本预处理:后缀树(Suffix Tree)
  • 计算机基础
    • 数据结构与算法
      • 字符串匹配算法
        • 字符串匹配 - 模式预处理:BM 算法 (Boyer-Moore)
  • 计算机基础
    • 数据结构与算法
      • 字符串匹配算法
        • 字符串匹配 - 模式预处理:KMP 算法(Knuth-Morris-Pratt)
  • 计算机基础
    • 数据结构与算法
      • 字符串匹配算法
        • 字符串匹配 - 模式预处理:朴素算法(Naive)(暴力破解)
  • 计算机基础
    • 数据结构与算法
      • 字符串匹配算法
        • 字符串匹配
  • 计算机基础
    • 数据结构与算法
      • 常用算法
        • 分支限界算法
  • 计算机基础
    • 数据结构与算法
      • 常用算法
        • 分治算法
  • 计算机基础
    • 数据结构与算法
      • 常用算法
        • 动态规划算法
  • 计算机基础
    • 数据结构与算法
      • 常用算法
        • 回溯算法
  • 计算机基础
    • 数据结构与算法
      • 常用算法
        • 贪心算法
  • 计算机基础
    • 数据结构与算法
      • 排序算法
        • 十大排序算法
  • 计算机基础
    • 数据结构与算法
      • 排序算法
        • 图解排序算法(一)之3种简单排序(选择,冒泡,直接插入)
  • 计算机基础
    • 数据结构与算法
      • 排序算法
        • 图解排序算法(三)之堆排序
  • 计算机基础
    • 数据结构与算法
      • 排序算法
        • 图解排序算法(二)之希尔排序
  • 计算机基础
    • 数据结构与算法
      • 排序算法
        • 图解排序算法(四)之归并排序
  • 计算机基础
    • 数据结构与算法
      • 数据结构
        • 树的高度和深度
  • 计算机基础
    • 数据结构与算法
      • 数据结构
        • 红黑树深入剖析及Java实现
  • 计算机基础
    • 数据结构与算法
      • 数据结构
        • 线性结构 - Hash
  • 计算机基础
    • 数据结构与算法
      • 数据结构
        • 线性结构 - 数组、链表、栈、队列
  • 计算机基础
    • 数据结构与算法
      • 数据结构
        • 逻辑结构 - 树
  • 运维&测试
    • 测试
      • Spock
        • Groovy 简明教程
  • 运维&测试
    • 测试
      • Spock
        • Spock 官方文档
  • 运维&测试
    • 测试
      • Spock
        • Spock单元测试框架介绍以及在美团优选的实践
  • 运维&测试
    • 测试
      • TDD
        • TDD 实践 - FizzFuzzWhizz(一)
  • 运维&测试
    • 测试
      • TDD
        • TDD 实践 - FizzFuzzWhizz(三)
  • 运维&测试
    • 测试
      • TDD
        • TDD 实践 - FizzFuzzWhizz(二)
  • 运维&测试
    • 测试
      • TDD
        • 测试驱动开发(TDD)- 原理篇
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Nacos
          • Nacos 服务注册的原理
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Nacos
          • Nacos 配置中心原理分析
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Seata
          • 服务调用过程
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Seata
          • Spring Cloud Bus
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Seata
          • Spring Cloud Consul
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Seata
          • Spring Cloud Stream
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Sentinel
          • Sentinel 与 Hystrix 的对比
  • 微服务
    • Spring Cloud
      • Spring Cloud Alibaba
        • Sentinel
          • Sentinel
  • 微服务
    • Spring Cloud
      • Spring Cloud Netflix
        • Hystrix
          • How Hystrix Works
  • 微服务
    • Spring Cloud
      • Spring Cloud Netflix
        • Hystrix
          • Hystrix
  • 微服务
    • Spring Cloud
      • Spring Cloud Netflix
        • Hystrix
          • Hystrix原理与实战
  • 微服务
    • Spring Cloud
      • Spring Cloud Netflix
        • Hystrix
          • Spring Cloud Hystrix基本原理
由 GitBook 提供支持
在本页
  • 1. 本地消息表方案简介
  • 1.1 事务上游处理逻辑
  • 1.2 事务下游处理逻辑
  • 2. 本地消息表方案注意点
  • 3. shieldTXC 框架原理及实现
  • 3.1 框架分析之事务提交
  • 3.2 框架分析之事务回滚
  • 3.3 小结
  • 4. shieldTXC 实战
  • 5. 全文总结

这有帮助吗?

  1. 分布式
  2. 分布式事务

手写实现基于消息队列的分布式事务框架

上一页分布式事务下一页分布式

最后更新于2年前

这有帮助吗?

转载:

分布式事务在现在的分布式开发领域已经成为必须考虑的因素,随着微服务、SOA 架构思想日益被开发者锁熟知,分布式事务的解决思路也不断被提出并被开源社区实现。

在互联网开发中,通常情况下,我们常常会采用一揽子“柔性事务”解决方案来保证系统内模块之间的数据的最终一致性。

业界知名的分布式事务解决方案有:

  1. TCC 方案,它的开源实现有 TCC-Transaction、ByteTCC,阿里开源的 Seata 框架也加入了对 TCC 模式的支持;

  2. 可靠消息最终一致方案,代表的实现方式为 RocketMQ 的事务消息;

  3. SAGA 事务,代表实现方式为华为开源的 serviceComb;

  4. 最大努力通知型解决方案,该方案与业务耦合较为严重,因此业界也没有一个较为抽象的开源实现;

  5. 消息溯源方案(或称为本地消息表方案),该方案实现较为简单,但也与业务耦合较为严重,据我调查暂时没有抽象的开源实现。

上述方案中,我挑选了第五种**消息溯源方案(本地消息表方案,后文均称为本地消息表方案)**作为自己写分布式事务框架的核心机制,旨在实现一个与业务无关的、基于消息的、异步确保的、最终一致的柔性事务框架。

框架的实现主要基于 RocketMQ 的普通消息,我们都知道 RocketMQ 已经支持了事务消息,这里只是基于它对本地消息表方案进行实现,原则上,本地消息表方案支持任意具有发布订阅能力的消息中间件。

上述其他实现在笔者的博客中有过较为系统的讲解,感兴趣的同学可以移步笔者的博客,本文就不再展开说明。

1. 本地消息表方案简介

为了加深读者的理解,方便行文,此处对本地消息表方案做一个介绍。

本地消息表方案源自 eBay,传入国内后多次被大厂(如:支付宝)落地,经过布道被大家所熟知。它的核心机理分为事务发起方(我们称事务上游),事务联动方(我们称下游),二者为分布式系统内两组不同的应用,它们应当使用独立的数据源。

1.1 事务上游处理逻辑

事务上游、执行本地业务的同时,将消息持久化到业务数据源中(持久化到数据库中),持久化流程与业务操作处于同一个事务中,保证了业务与消息同时成功持久化。

如果消息队列支持事务机制呢?就不用选择了本地消息表了,本地消息表是针对不支持事务机制的消息队列。

到这里,事务上游的业务就执行结束了,通过线程异步地轮询消息表,将待发送的消息投递到消息中间件的事务执行队列(我们将该队列的 Topic 称为事务执行 Topic)中。投递成功则更新消息状态为 [已投递]。

1.2 事务下游处理逻辑

事务下游拉取消息进行消费,在业务消费之前,首先将消息持久化到本地,持久化成功后执行消费逻辑。

下游通过重试最大限度地保证业务消费逻辑执行成功,如果达到某个设定的消费次数阈值仍旧消费失败,那么我们认为事务下游没办法将事务处理成功,此时事务下游拷贝之前持久化的消息,标记为回滚状态消息,并投递到业务回滚队列(我们将该队列的 Topic 称为事务回滚 Topic)中。投递成功则更新消息状态为 [已投递]。

事务上游需要实现回滚逻辑,接收到事务下游投递的回滚消息后,执行回滚逻辑,对业务进行回滚操作。该流程通过消费重试实现,如果达到最大消费次数仍旧不能回滚,则回滚消息会进入消息中间件的死信队列。此时需要人工干预,取出死信消息进行手工回滚操作,保证业务的正常运行。

一般而言,如果环境稳定,业务逻辑无严重 Bug,是不会出现一直重试都执行不完的情况,如果有,很大可能是代码逻辑有问题需要做进一步的排查。

2. 本地消息表方案注意点

本地消息表方案依赖消息中间件,通过消息发送阶段的 ACK 判断消息是否被持久化,一旦返回消息投递成功,则通过消息中间件本身的配置即可保证该消息不会丢失;通过消费阶段的重试加上业务系统的幂等保证事务下游与事务上游能够最大可能的达成最终一致。

如果还是存在异常,则需要人工干预,此处也能看出一点,技术方案往往都是折中产物,这也是最终一致性本身的特点,我们能够容忍一定时间的不一致状态,但是我们能够确保该不一致时间窗口之后,业务的上下游能够达成数据的一致性,建立在该前提下,我们才能够探讨分布式事务的柔性解决方案。

由于本地消息表方案依赖了消息中间件,因此我们需要保证消息中间件的高可靠,否则系统的可用性会因为引入第三方组件而下降。如:配置 RocketMQ 的多主多从集群模式,使用 Kafka 的多副本集群等,生产环境坚决不能出现单点风险。

除了消息中间件选型,我们还要保证消息持久化与本地事务要处于同一个事务域中。为什么要这么做呢?

本地消息表方案的使用,主要的目的是为了解决消息发送与事务提交不能保证同时成功同时失败,也就是消息发送本身与本地的事务并非是原子性的。

我们设想一个错误场景,某系统为了解决跨应用的分布式事务问题,因此引入了消息中间件,设想通过消息通讯作为上下游应用间的事务交互方式。即:上游处理业务完成后(往 DB 中插入了若干业务记录),然后发布一个普通消息到 MQ 的某 Topic,下游订阅该 Topic,对消息进行拉取并消费,完成下游业务。

那么这么做能够保证上下游数据的一致性吗?如果没有异常情况出现,当然是可以的,但是由于 DB 与 MQ 是不同的系统,因此可能出现插入 DB 成功,但发送消息到 MQ 失败;或者出现插入 DB 失败,但发送消息到 MQ 成功。如果出现这类异常情况,业务的上下游系统间数据便不是一致的。

我们分析一下异常情况发生的机理:

当上游进行业务处理完成后,提交了本地事务,接着进行消息发送时,上游系统宕机,当上游系统恢复后,消息也不会再发送了,那么上下游就会出现数据的不一致。

这个异常出现的原因就是没有保证消息发送与本地事务的原子性,而引入本地消息表则能解决这一问题。

当引入本地消息表,在业务事务内将消息进行持久化,由于业务数据与消息数据处于同一事务,因此二者一定是同时成功同时失败,也就是原子的。**当消息持久化后,通过异步线程扫表进行发送,如果出现系统宕机,在恢复之后,由于消息已经存在,因此能够将该持久化的消息扫描出来进行相应的投递操作。**这就保证了本地事务执行成功后,消息一定能发送出去。这里可能会有疑问,万一消息发送失败呢?当然是继续进行重发了,直到发出去为止。

如果本地事务直接就执行失败了,那么消息也不会持久化,此时业务就是失败的,需要业务方进行重试,这种情况下是不存在数据不一致的情况的。

3. shieldTXC 框架原理及实现

上文中,我们已经对本地消息表方案做了一个较为全面的了解,并对它是如何保证事务执行与消息发送同时成功同时失败的机理有了清晰的认知。

从本节开始,我们正式进入自己写分布式事务框架的部分。

体外话不多说,接下来我们一边看框架的原理图,在宏观上对框架的机理做一个了解,一边对相应的原理进行代码实现讲解,这样理论与实战相结合,相信会加深读者朋友的理解。

首先看下框架的核心原理图。

看起来还是比较整洁的,这也是笔者写代码的一个宗旨:

好的框架可以复杂,但架构一定是优雅的、清晰的。

我们配合代码深入分析一下这张图,主要分为两个主要的阶段:

  1. 事务提交阶段

  2. 事务回滚阶段

首先来看下事务提交阶段的实现原理及相对应的代码实现。

3.1 框架分析之事务提交

图中左侧的红字标记该部分为上游应用(后续统称为上游)的事务提交阶段的运行逻辑,上游在这个阶段又分为两个子阶段。

3.1.1 第一阶段:本地事务与消息持久化

上游在执行本地事务成功在,在同一事务内对业务实体封装为消息体,调用 shieldTXC 提供的消息持久化接口将消息持久化到业务库中,框架的消息持久化方法需要保证事务性。

代码如下:

@Transactional(rollbackFor = Exception.class)
public void testTran() {
    // 本地事务
    doLocalTransaction();
    // 消息持久化
    TestTxMessage testTxMessage = new TestTxMessage();
    testTxMessage.setName(UUID.randomUUID().toString().replace("-", "").substring(0, 10));
    shieldTxcRocketMQProducerClient
            .putMessage(testTxMessage, EventType.INSERT, TXType.COMMIT, testTxMessage.getName(),
                    UUID.randomUUID().toString());
}

这个阶段的重点在于消息持久化的实现。一起来看一下 shieldTXC 是如何实现的:

/**
 * 消息持久化
 * @param shieldTxcMessage
 * @param eventType
 * @param txType
 * @param appId
 */
@Transactional(rollbackFor = Exception.class)
public void putMessage(AbstractShieldTxcMessage shieldTxcMessage,
                       EventType eventType,
                       TXType txType,
                       String appId,
                       String bizKey) {
    Preconditions.checkNotNull(eventType, "Please insert eventType, type is:[com.shield.txc.constant.EventType]");
    Preconditions.checkNotNull(bizKey, "Please insert unique bizKey!");

    ShieldEvent event = new ShieldEvent();
    event.setEventType(eventType.toString())
            .setTxType(txType.toString())
            .setEventStatus(EventStatus.PRODUCE_INIT.toString())
            .setContent(shieldTxcMessage.encode())
            .setBizKey(bizKey)
            .setAppId(appId);
    try {
        // 入库失败回滚
        boolean insertResult = this.getBaseEventService().insertEvent(event);
        if (!insertResult) {
            throw new BizException("insert ShieldEvent into DB occurred Exception!");
        }
    } catch (Exception e) {
        // 异常回滚
        throw new BizException("insert ShieldEvent into DB occurred Exception!", e);
    }
}

为了便于统一管理,shieldTXC 定义了一个抽象消息类,业务方需要继承该抽象类,通过实现其中的 decode 与 encode 方法为业务消息提供编解码能力。

public abstract class AbstractShieldTxcMessage implements Serializable {

    private static final long serialVersionUID = -2416427331208398607L;
    /**消息序列化*/
    public abstract String encode();
    /**消息反序列化*/
    public abstract void decode(String msg);
}

我们接着看上面的 putMessage 方法。

通过传参,组装了一个 ShieldEvent 持久化消息实体,并将其进行 insert 操作,通过判断入库是否成功决定是否抛出异常让事务进行回滚。

如果消息持久化成功,则本地事务提交;如果消息持久化失败,则本地事务回滚,业务结束。

3.1.2 第二阶段:消息投递

业务方完成上述第一阶段的消息持久化操作,就不需要进行其他的额外操作了。

此时框架开始执行第二阶段:消息投递阶段。

我们从图中可以清晰的看到,ShieldEvent 会在内部维护一个定时任务,扫描状态为初始化待发送消息,组装成功消息体并调用 MQ 的发送消息接口,将投递到事务提交队列。这个过程中要保证消息可达。

这个子阶段用文字描述起来比较简洁,我们接着看下框架代码是如何实现的,加深认知。

1. 定时任务初始化

整个 shieldTXC 内核已经打包为一个 Spring Boot 的 starter,因此可以通过 @Enable 注解简单的整合到 Spring Boot 应用中,框架本身提供了对核心 Bean 的初始化过程,这里我们主要看下扫描待发送消息的定时任务的初始化过程。

定时任务的初始化过程是在框架的 ShieldEventTxcConfiguration.java 类中实现的,类声明如下:

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class ShieldEventTxcConfiguration {}

通过 @Configuration 标记为一个配置 Bean,通过 @EnableConfigurationProperties(RocketMQProperties.class) 开启可配置能力。shieldTXC 支持通过配置文件进行配置。

这里重点分析一下扫描待发送消息定时任务的初始化:

/**
 * 异步消息调度构造
 * @param rocketMQProperties
 * @return
 */
@Bean
@ConditionalOnMissingBean
@Order(value = 3)
public SendTxcMessageScheduler sendTxcMessageScheduler(RocketMQProperties rocketMQProperties) {
    SendTxcMessageScheduler sendTxcMessageScheduler = new SendTxcMessageScheduler();
    // 设置调度线程池参数
    sendTxcMessageScheduler.setInitialDelay(rocketMQProperties.getTranMessageSendInitialDelay());
    sendTxcMessageScheduler.setPeriod(rocketMQProperties.getTranMessageSendPeriod());
    sendTxcMessageScheduler.setCorePoolSize(rocketMQProperties.getTranMessageSendCorePoolSize());
    // 数据库操作
    sendTxcMessageScheduler.setBaseEventService(baseEventService(baseEventRepository()));
    // 消息发送
    sendTxcMessageScheduler.setShieldTxcRocketMQProducerClient(rocketMQEventProducerClient(rocketMQProperties));
    LOGGER.debug("Initializing [sendTxcMessageScheduler] instance success.");
    // 执行调度
    sendTxcMessageScheduler.schedule();
    return sendTxcMessageScheduler;
}

通过 @Bean 标记为 Spring 容器中的 Bean,Spring 在初始化过程中会加载我们的 Bean,Bean 的 name 就是方法名,这个方式是 Spring 提供的通过 JavaConfig 方式进行 Bean 定义的方式,这里不做展开。

我们首先初始化了一个 SendTxcMessageScheduler 实例,对其进行参数的配置,诸如线程池参数、数据库操作相关的 Bean 依赖(通过相同方式进行初始化的)、消息发送的 Bean 依赖(通过相同方式进行初始化的)。依赖设置完成后调用 schedule() 方法开启任务调度。当应用启动完成之后便会自动开始进行待发送消息的扫描操作。

关于数据库操作 Bean、RocketMQ 操作的 Bean 初始化过程,感兴趣的读者可以去源码的 com.shield.txc.configuration.ShieldEventTxcConfiguration.java 类中查看,请恕本文不再展开。

2. 定时任务核心逻辑分析

初始化完成 SendTxcMessageScheduler 之后,具体又是如何对待发送消息进行处理的呢?带着疑问,我们深入 SendTxcMessageScheduler 这个调度类中一探究竟。

SendTxcMessageScheduler 方法的声明如下:

public class SendTxcMessageScheduler extends AbstractMessageScheduler implements Runnable {}

可以看到 SendTxcMessageScheduler 实例本身也是一个 Runnable 实例,这里暂且放一下,后续的分析中会用到,我们接着往下看。

上述的代码中在初始化完成 SendTxcMessageScheduler 实例后,调用了 schedule() 方法,那么我们重点看下这个方法。

public void schedule() {
    executorService.scheduleAtFixedRate(
            this,
            this.initialDelay,
            this.period,
            this.timeUnit);
}

代码逻辑很简单,通过内部的 executorService 开启了调度流程,executorService 的初始化是在构造方法中完成的,默认构造方法如下:

public SendTxcMessageScheduler() {
    executorService = Executors.newScheduledThreadPool(corePoolSize);
}

在 schedule() 方法中,调用 scheduleAtFixedRate 方法,第一个参数传入了 this 引用,线程池会定时执行 this 引用(也是一个 Runnable 引用)的 run 方法,方法逻辑:

@Override
public void run() {
    // 查询并发送消息
    try {
        // 获取待调度的消息,初始态==初始化
        List<ShieldEvent> shieldEvents = baseEventService.queryEventListByStatus(EventStatus.PRODUCE_INIT.toString());
        if (CollectionUtils.isEmpty(shieldEvents)) {
            return;
        }
        for (ShieldEvent shieldEvent : shieldEvents) {
            // 发送前改状态
            processBeforeSendMessage(shieldEvent);
            // 发送消息核心逻辑
            sendMessage(shieldEvent);
            // 判断发送结果,成功则更新为已发送
            processAfterSendMessage(shieldEvent);
        }
    } catch (Exception e) {
        LOGGER.error("Sending rollback message occurred Exception!", e);
        return;
    }
}

这里主要采用了模板方法对业务逻辑进行了封装。

首先获取待调度的消息,状态为 PRODUCE_INIT(生产初始化)。默认获取 50 条。

如果未查询到消息,则结束本次调度,对于查询到的消息列表 shieldEvents 进行迭代:

  1. 首先进行发送前置操作,即方法 processBeforeSendMessage。将消息的状态从 PRODUCE_INIT 改为 PRODUCE_PROCESSING(生产处理中)。这么做的目的在于通过状态机方式的乐观锁达到支持并发的目的。这个思路在日常业务开发中也经常用到。

  2. 接着进行发送核心操作,这里需要看一下代码是如何实现的:

    /**
     * 发送事务消息
     * @param shieldEvent
     */
    @Override
    public void sendMessage(ShieldEvent shieldEvent) {
      int eventId = shieldEvent.getId();
      // 组装Message
      ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
      shieldTxcMessage
        .setId(String.valueOf(eventId))
        .setAppId(shieldEvent.getAppId())
        .setContent(shieldEvent.getContent())
        .setEventType(shieldEvent.getEventType())
        .setEventStatus(shieldEvent.getEventStatus())
        .setTxType(shieldEvent.getTxType())
        .setBizKey(shieldEvent.getBizKey());
    
      String messgeBody = shieldTxcMessage.encode();
      String topic = null;
      BizResult bizResult = null;
      // 发送commit消息,判断消息类型
      if (TXType.COMMIT.toString().equals(shieldTxcMessage.getTxType())) {
        topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE,
                                             shieldTxcRocketMQProducerClient.getTopic());
        Message commitMessage = new Message(topic, messgeBody.getBytes());
        bizResult = shieldTxcRocketMQProducerClient.sendCommitMsg(commitMessage, eventId);
      }
      // 发送rollback消息
      if (TXType.ROLLBACK.toString().equals(shieldTxcMessage.getTxType())) {
        topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_ROLLBACK_STAGE,
                                             shieldTxcRocketMQProducerClient.getTopic());
        Message rollbackMessage = new Message(topic, messgeBody.getBytes());
        bizResult = shieldTxcRocketMQProducerClient.sendRollbackMsg(rollbackMessage, eventId);
      }
      if (bizResult.getBizCode() != BizCode.SEND_MESSAGE_SUCC) {
        LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[FAIL], Message Body:[{}]", messgeBody);
        return;
      }
      LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{}]", messgeBody);
    }

    首先将消息实体转换为 ShieldTxcMessage。ShieldTxcMessage 是框架封装的消息类型,内部的 encode、decode 方法提供了序列化、反序列化能力。

    接着判断消息类型,如果是 TXType.COMMIT 则将消息投递到事务提交队列;如果是 TXType.ROLLBACK 则将消息投递到事务回滚队列。

    在上游业务中发送的消息均为事务提交消息,因此会被投递到事务提交队列中。

    最后判断消息发送结果,如果发送失败会进行重试,该重试能力是 MQ 中间件客户端提供的。对于多次发送都失败的消息需要更改状态为初始化,继续进行投递,保证消息一定能发出去。长时间发布出去的消息进发送失败表,后续需要对该失败表进行扫描,触发本地业务回滚。这种情况因为极其少见,因此框架当前版本暂未实现,后续更新后在 GitHub 的页面中注明。

  3. 消息投递成功后,调用 processAfterSendMessage 方法进行状态更新,将处理中状态 PRODUCE_PROCESSING 改为 PRODUCE_PROCESSED (生产处理成功)。

到此就完成了分布式事务上游的消息发布流程。

3.1.3 第三阶段:消息消费,下游事务提交

为了方便读者对照,这里再贴一下原理图。我们接着分析事务下游应用(后文称下游)是如何对事务提交队列中的消息进行消费从而完成本地事务的。

1. 消费适配器初始化

下游应用启动过程中需要完成消费适配器的初始化,实现对事务提交队列消息的消费。该消费逻辑实现了下游与上游的最终一致性。

@Service
public class TxConsumeService implements InitializingBean {

    @Value("${shield.event.rocketmq.nameSrvAddr}")
    String nameSerAddr;

    @Value("${shield.event.rocketmq.topicSource}")
    String topic;

    @Override
    public void afterPropertiesSet() throws Exception {

        new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcCommitListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("测试消费ShieldTxcCommitListener开始......");

                Random ra =new Random();
                int randomInt = ra.nextInt(10) + 1;
                if (randomInt <= 5) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        }));
    }
}

上面这段代码为下游应用需要实现的,在应用启动阶段初始化了 ShieldTxcConsumerListenerAdapter 消费适配器。

ShieldTxcConsumerListenerAdapter

ShieldTxcConsumerListenerAdapter 的主要构造方法有三个,分别为:

  • 事务下游可选构造方法,即分布式事务的最终执行者。

    /**
     * 事务下游可选
     * @param nameSrvAddr
     * @param topic
     * @param txCommmtListener
     */
    public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                            String topic,
                                            ShieldTxcCommitListener txCommmtListener) {
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
        this.txCommmtListener = txCommmtListener;
        init();
    }
  • 事务上游可选构造方法,即分布式事务的最初发起者。

    /**
     * 事务上游可选
     * @param nameSrvAddr
     * @param topic
     * @param txRollbackListener
     */
    public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                            String topic,
                                            ShieldTxcRollbackListener txRollbackListener) {
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
        this.txRollbackListener = txRollbackListener;
        init();
    }
  • 事务上下游可选构造方法,即一个应用处于上游和下游之间,它既是该分布式事务的上游又是该分布式事务的下游。

    public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                            String topic,
                                            ShieldTxcCommitListener txCommmtListener,
                                            ShieldTxcRollbackListener txRollbackListener) {
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
        this.txCommmtListener = txCommmtListener;
        this.txRollbackListener = txRollbackListener;
        init();
    }

上述的 Demo 代码所在的应用为分布式事务的最终下游,因此只需要实现 ShieldTxcCommitListener 接口,完成 consumeMessage 回调方法。我们在其中模拟百分之五十提交,百分之五十重试的情况,测试正常与异常情况下框架的表现。

在 ShieldTxcConsumerListenerAdapter 构造方法中均调用了 init() 方法,该方法初始化了真正的消费者客户端,对 MQ 进行监听。

public ShieldTxcConsumerListenerAdapter init() {
    // 初始化shieldTxcRocketMQConsumerClient
    Preconditions.checkNotNull(this.nameSrvAddr, "please insert RocketMQ NameServer address");
    shieldTxcRocketMQConsumerClient =
            new ShieldTxcRocketMQConsumerClient(this.topic, this.nameSrvAddr, this.getTxCommmtListener(), this.getTxRollbackListener());
    LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient] instance init success.");
    return this;
}

shieldTxcRocketMQConsumerClient

我们进入 shieldTxcRocketMQConsumerClient 类中,观察一下它是如何进行初始化的。

public ShieldTxcRocketMQConsumerClient(String topic,
                                       String nameSrvAddr,
                                       ShieldTxcCommitListener txCommtListener,
                                       ShieldTxcRollbackListener txRollbackListener) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
    if (txCommtListener == null && txRollbackListener == null) {
        throw new BizException("Please define at least one MessageListenerConcurrently instance, such as [ShieldTxcCommitListener] or [ShieldTxcRollbackListener] or both.");
    }
    if (txCommtListener != null) {
        // 初始化事务提交消费者
        initCommitConsumer(this.topic, this.nameSrvAddr, txCommtListener);
        LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.CommmitConsumer] instance init success.");
    }
    if (txRollbackListener != null) {
        // 初始化事务回滚消费者
        initRollbackConsumer(this.topic, this.nameSrvAddr, txRollbackListener);
        LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.RollbackListener] instance init success.");
    }
}

上述代码是 ShieldTxcRocketMQConsumerClient 初始化流程,可以看到主要是判断了 ShieldTxcCommitListener、ShieldTxcRollbackListener 实例是否为空,如果不为空则进行对应消费者的初始化。两个初始化流程基本相同,我们重点看下 initCommitConsumer 方法的逻辑。

/**
 * 初始化事务提交消费者
 * @param topic
 * @param nameSrvAddr
 */
private void initCommitConsumer(String topic, String nameSrvAddr, ShieldTxcCommitListener txCommtListener) {
    commitConsumer =
            new DefaultMQPushConsumer(
                    MessagePropertyBuilder.groupId(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic));
    commitConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    commitConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    commitConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    commitConsumer.registerMessageListener(txCommtListener);
    // 订阅所有消息
    try {
        commitConsumer.subscribe(
                MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic), "*");
        // 启动消费者
        commitConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("Loading [com.shield.txc.RocketMQEventConsumerClient.commmitConsumer] occurred exception", e);
    }
}

相信不需要我再做多余的解释了,这里其实就是启动了一个 DefaultMQPushConsumer 消费者客户端,并对对应的 Topic 进行订阅,从而实现对对应队列中消息的消费。如果读者对 RocketMQ 不是很了解,可以到 RocketMQ 开发者中心进行相关的学习。

回到原理图中,可以看到下游应用的逻辑中有一个 shieldTXC Commit 拦截器,这个拦截器才是消费阶段的重点,框架通过该拦截器对消费过程进行了代理,加入了前置后置操作,从而保证了消费阶段的分布式事务的一致性。

拦截器代码在 ShieldTxcCommitListener 中。

ShieldTxcCommitListener

ShieldTxcCommitListener 的类声明及构造方法如下:

public class ShieldTxcCommitListener implements MessageListenerConcurrently {

    public ShieldTxcCommitListener(MessageListenerConcurrently txCommmtListener) {
        this.txCommmtListener = txCommmtListener;
    }
}

可以看到 ShieldTxcCommitListener 实现了 MessageListenerConcurrently 接口。当 ShieldTxcCommitListener 在构造过程中将外界传入的 MessageListenerConcurrently 实例的引用指向了内部的 MessageListenerConcurrently 引用。

ShieldTxcCommitListener 本身也是 MessageListenerConcurrently 实例,通过它的 consumeMessage 代理了外部传入的 MessageListenerConcurrently 实例,通过加入了切面逻辑对消费过程做了进一步的处理。

这里就对 ShieldTxcCommitListener 如何对真实的消费过程进行代理做深入的分析。

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

    for (MessageExt msg : msgs) {
        String msgBody = new String(msg.getBody());
        String msgId = msg.getMsgId();
        LOGGER.debug("[ShieldTxcCommitListener]Consuming [COMMIT] Message start... msgId={},msgBody={}", msgId, msgBody);
    }
}

首先进行前置参数的获取,这里不需要过多解释。

ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);

ShieldEvent event = new ShieldEvent();
event.convert(shieldTxcMessage);

BaseEventService baseEventService = (BaseEventService) SpringApplicationHolder.getBean("baseEventService");

将消息体进行反序列化,并转换为 ShieldEvent 消息实体便于后续操作;通过 Spring 上下文获取到数据库操作 Bean,便于进行消息状态修改。

try {
            // 消费幂等,查询消息是否存在,入库带唯一索引
            // 消费次数大于等于阈值,回滚事务
            int currReconsumeTimes = msg.getReconsumeTimes();
            if (currReconsumeTimes >= CommonProperty.MAX_COMMIT_RECONSUME_TIMES) {
                // 事务回滚操作,消息复制为回滚生产者,持久化
                LOGGER.debug("[ShieldTxcCommitListener] START transaction rollback sequence! msgId={},currReconsumeTimes={}", msgId, currReconsumeTimes);
                if (doPutRollbackMsgAfterMaxConsumeTimes(baseEventService, event, msgId)) {
                    LOGGER.debug("[ShieldTxcCommitListener] transaction rollback sequence executed SUCCESS! msgId={}", msgId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    // 如果一直失败最后会进死信
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

这里获取了消息的当前消费次数,并与最大消费次数进行比对。

如果消费次数已经超过最大消费次数(默认为 10 次),则进行分布式事务的回滚操作。这么做的原因为:下游无论如何都没办法提交本地事务,如果不回滚则上下数据一致性就被破坏了,因此这里通过 doPutRollbackMsgAfterMaxConsumeTimes 进行了如下的业务处理:

  1. 将消息的状态由 CONSUME_PROCESSING(消费处理中,该状态在首次消费开始时就由 CONSUME_INIT 修改得到)改为 CONSUME_MAX_RECONSUMETIMES(达到最大消费次数)。

  2. 克隆消息,插入事务回滚消息,状态为 PRODUCE_INIT

  3. 完成后等待事务下游的 SendTxcMessageScheduler 对回滚消息进行投递。

String bizKey = shieldTxcMessage.getBizKey();
            String txType = shieldTxcMessage.getTxType();
            String eventStatua = shieldTxcMessage.getEventStatus();
            String appId = shieldTxcMessage.getAppId();
            String eventType = shieldTxcMessage.getEventType();

            // 进行消息持久化
            event.setEventType(eventType)
                    .setTxType(txType)
                    .setEventStatus(EventStatus.CONSUME_INIT.toString())
                    .setContent(shieldTxcMessage.getContent())
                    .setAppId(shieldTxcMessage.getAppId())
                    .setBizKey(bizKey)
                    .setId(Integer.valueOf(shieldTxcMessage.getId()));
            // 入库失败回滚
            boolean insertResult = baseEventService.insertEventWithId(event);
            if (!insertResult) {
                LOGGER.warn("[ShieldTxcCommitListener] insert shieldEvent Consume Message failed,msgId={}", msgId);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

如果当前没有达到最大消费次数则将消息进行持久化操作,这里在数据库中对消息的 id 与业务唯一键 bizkey 加了唯一索引,如果重复插入会报唯一约束异常,便不会重复插入,保证了消息的唯一性。

// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, event);
// 真实消费
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);

消息入库后将消息状态改为 CONSUME_PROCESSING(消费处理中),接着进行真实的消费过程,通过拦截判断真实的消费结果对消息状态进行对应的修改。

} catch (Exception e) {
            // 幂等处理:唯一约束触发则直接进行消费
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
                LOGGER.debug("[ShieldTxcCommitListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
                return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
            }
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
                LOGGER.debug("[ShieldTxcCommitListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
                return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
            }
            // 其他异常重试
            LOGGER.warn("ShieldTxcCommitListener Consume Message occurred Exception,msgId={}", msgId, e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return null;
}

这里为对消息重复插入的处理流程,如果消息重复入库,会抛出异常并被捕获,则直接进行真实消息消费流程,通过方法 doUpdateAfterConsumed 实现。

/**
 * 拦截真实消费结果,根据消费结果更新消息状态
 *
 * @param consumeConcurrentlyStatus
 * @param baseEventService
 * @param shieldEvent
 * @return
 */
private ConsumeConcurrentlyStatus doUpdateAfterConsumed(BaseEventService baseEventService,
                                                        ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
                                                        ShieldEvent shieldEvent) {
    LOGGER.debug("[ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [{}]", consumeConcurrentlyStatus);
    if (ConsumeConcurrentlyStatus.RECONSUME_LATER.name().equals(consumeConcurrentlyStatus.name())) {
        // 消费失败,消费状态仍旧处理中
        return consumeConcurrentlyStatus;
    }
    if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name().equals(consumeConcurrentlyStatus.name())) {
        // 消费成功,处理中改完成,更新前状态:消费处理中
        shieldEvent.setBeforeUpdateEventStatus(shieldEvent.getEventStatus());
        // 更新后状态:消费完成
        shieldEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
        boolean updateBefore = baseEventService.updateEventStatusById(shieldEvent);
        if (!updateBefore) {
            // 更新失败,幂等重试.此时必定是系统依赖组件出问题了
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

调用 doUpdateAfterConsumed 时传入的参数 ConsumeConcurrentlyStatus 代表业务层返回的消费状态。该状态只有两个值:CONSUME_SUCCESS/RECONSUME_LATER。

如果返回 RECONSUME_LATER,表明事务提交消息消费失败,后续会继续进行重试。消息表中消息的状态仍旧为 CONSUME_PROCESSING,我们不做多余的处理。

如果返回 CONSUME_SUCCESS,表明事务提交消息消费成功,则将数据库中的这条消息状态改为 CONSUME_PROCESSED。如果修改不成功,则进行重试即可,业务侧的消费逻辑要注意保证消费幂等。

整个消费过程如果达到重试次数上限仍旧不能成功,则会触发全局事务的回滚操作,这就保证了整个分布式事务的闭环。

3.2 框架分析之事务回滚

上文我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。

这里主要看图的下半部分。

当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,shieldTXC 的消息发送线程 sendTxcMessageScheduler 会扫描到待发送的回滚消息并投递到 [事务回滚队列]。

3.2.1 第一阶段:实现回滚逻辑

事务上游应用在启动过程中初始化了 ShieldTxcConsumerListenerAdapter 消费适配器,并通过 ShieldTxcRollbackListener 实现了回滚逻辑。

@Service
public class TxConsumeService implements InitializingBean {

    @Value("${shield.event.rocketmq.nameSrvAddr}")
    String nameSerAddr;
    @Value("${shield.event.rocketmq.topicSource}")
    String topic;

    @Override
    public void afterPropertiesSet() throws Exception {
        new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }));
    }
}

这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。

3.2.2 第二阶段:回滚拦截

同 ShieldTxcCommitListener 类似,ShieldTxcRollbackListener 也实现了对回滚消费逻辑的拦截,它的声明如下:

ShieldTxcRollbackListener

public class ShieldTxcRollbackListener implements MessageListenerConcurrently {

    private MessageListenerConcurrently txRollbackListener;

    public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) {
        this.txRollbackListener = txRollbackListener;
    }
}

ShieldTxcRollbackListener 同样实现了 MessageListenerConcurrently 接口。在构造过程中将外界传入的 MessageListenerConcurrently 实例的引用指向了内部的 MessageListenerConcurrently 引用。

ShieldTxcRollbackListener 同样是 MessageListenerConcurrently 实例,通过它的 consumeMessage 方法代理了外部传入的 MessageListenerConcurrently 实例,通过加入了切面逻辑对消费过程做了进一步的处理。

我们重点对 ShieldTxcCommitListener 如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // 测试打印消息体
    for (MessageExt msg : msgs) {

        String msgBody = new String(msg.getBody());
        String msgId = msg.getMsgId();
        LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);
    }
}

获取参数进行日志打印。

ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);

ShieldEvent rollbackEvent = new ShieldEvent();
rollbackEvent.convert(shieldTxcMessage);

BaseEventService baseEventService = (BaseEventService) SpringApplicationHolder.getBean("baseEventService");

将消息协议进行反序列化,并转换为消息实体,从 Spring 上下文中获取数据库持久化服务 BaseEventService,为后续数据库操作做准备。

try {
            // 取参数
            String bizKey = shieldTxcMessage.getBizKey();
            String txType = shieldTxcMessage.getTxType();
            String eventStatua = shieldTxcMessage.getEventStatus();
            String appId = shieldTxcMessage.getAppId();
            String eventType = shieldTxcMessage.getEventType();

            // 回滚消息持久化
            rollbackEvent.setEventType(eventType)
                    .setTxType(TXType.ROLLBACK.toString())
                    .setEventStatus(EventStatus.CONSUME_INIT.toString())
                    .setContent(shieldTxcMessage.getContent())
                    .setAppId(shieldTxcMessage.getAppId())
                    .setBizKey(bizKey)
                    .setId(Integer.valueOf(shieldTxcMessage.getId()));

            // 入库失败回滚
            boolean insertResult = baseEventService.insertEventWithId(rollbackEvent);
            if (!insertResult) {
                LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT(消费初始化):

// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, rollbackEvent);
// 真实消费
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);

消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING],进行真实消费过程,真实消费完成后对消费结果做后置处理。

} catch (Exception e) {
            // 幂等处理:唯一约束触发则直接进行消费
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
                LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
                return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
            }
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
                LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
                return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
            }
            // 其他异常重试
            LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return null;
}

这里的逻辑与 ShieldTxcCommitListener 的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。

我们接着看一下 doUpdateAfterRollbackConsumed 方法如何进行消费后置处理。

/**
 * 拦截真实消费结果,根据消费结果更新消息状态
 *
 * @param consumeConcurrentlyStatus
 * @param baseEventService
 * @param rollbackEvent
 * @return
 */
private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService,
                                                                ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
                                                                ShieldEvent rollbackEvent) {
    if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
        // 消费失败,消费状态仍旧处理中
        return consumeConcurrentlyStatus;
    }
    if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
        // 消费成功,处理中改完成,更新前状态:消费处理中
        rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus());
        // 更新后状态:消费处理中
        rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
        boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent);
        if (!updateBefore) {
            // 更新失败,幂等重试.此时必定是系统依赖组件出问题了
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED]。如果是消费失败,则会进行重试。

上游的回滚逻辑通过重试保证成功,如果达到 MQ 最大重试次数(RocketMQ 是 16 次)会进死信,此时需要人工介入进行补偿操作。

在上线之前只要进行了充分的测试,保证业务无严重 Bug,确保线上 MQ 集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在 Bug 或者 MQ 集群不稳定,此时需要人工介入进行排错。

3.3 小结

到这里我们就完成了整个框架原理图与代码实现的分析。

实现告一段落,我们接着看一个简单的应用 Demo,直观感受一下 shieldTXC 分布式事务框架的魅力。

4. shieldTXC 实战

项目整体的结构如下:

shieldTxc
    |-txc-core        ShieldTXC内核
    |-txc-demo-up     分布式事务上游应用
    |-txc-demo-down   分布式事务下游应用

首先对项目进行整体的编译,接着对应用 txc-demo-up、txc-demo-down,修改配置文件 application.properties:

########################################################################
#
#     shield-txc
#
#########################################################################
# RocketMQ的nameserver地址:必填
shield.event.rocketmq.nameSrvAddr=127.0.0.1:9876
# 事务消息topic:非必填,默认为DEFAULT_TXC_XXX
shield.event.rocketmq.topicSource=DEFAULT_TXC_XXX
# 发送失败重试次数:非必填,默认为10次
shield.event.rocketmq.retryTimesWhenSendFailed=3次
# 消息发送调度初始化延时,单位:秒:非必填,默认为0
shield.event.rocketmq.tranMessageSendInitialDelay=0秒
# 消息发送调度间隔,单位:秒,非必填,默认为5秒
shield.event.rocketmq.tranMessageSendPeriod=5
# 消息发送调度核心线程数:非必填,默认为1
shield.event.rocketmq.tranMessageSendCorePoolSize=1

如果仅仅是对框架进行尝鲜,只需要配置 NameServer 地址即可。

接着需要初始化数据库,在两个应用的 application.properties 中配置数据源,并在各自的数据源中放置消息表,初始化脚本在项目根路径下的 script 目录下,直接导入数据库即可。

在正式测试 Demo 前需要保证有可用的 RocketMQ 环境,关于如何搭建 RocketMQ 读者可以自行学习。

MQ 环境就绪后,分别启动两个应用, 等待生产者端完成本地事务并将事务提交消息持久化。

上游应用打印日志如下:

事务上游本地事务开始,消息持久化.....
2019-08-04 12:35:27.117 |-DEBUG [pool-1-thread-1] com.shield.txc.schedule.SendTxcMessageScheduler [141] -| [SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{"id":"112","eventType":"INSERT","txType":"COMMIT","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}]

日志表明本地事务执行完成,消息持久化,我们看下数据库中的记录:

可以看到消息已经入库,并且应被处理为 [PRODUCE_PROCESSED](生产处理完成),也就是消息被投递到 MQ。

下游应用打印如下:

测试消费ShieldTxcCommitListener开始......
2019-08-04 12:35:27.802 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcCommitListener [165] -| [ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [RECONSUME_LATER]

下游开始对事务提交消息进行消费,执行本地业务。

下游事务提交逻辑返回的状态为 [RECONSUME_LATER],重试三次后事务回滚,持久化事务回滚消息,并投递到 MQ。

下游数据库中消息记录如下:

可以看到符合我们的预期,事务提交消息消费三次后不再消费,状态更改为 [CONSUME_MAX_RECONSUMETIMES],同时投递了事务回滚消息,消息状态为 [PRODUCE_PROCESSED]。

我们回到上游应用,查看到日志如下:

2019-08-04 12:37:09.202 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcRollbackListener [48] -| [ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId=C0A801653B9818B4AAC2122845CC0000,msgBody={"id":"113","eventType":"INSERT","txType":"ROLLBACK","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}
测试消费【回滚】ShieldTxcRollbackListener

表明上游应用消费了事务回滚消息,持久的回滚消息状态如图:

从这个完整的业务执行链路可以看到,shieldTXC 能够很好的处理分布式事务,保证分布式系统上下游数据的一致性。

如果下游消费事务提交消息返回 [CONSUME_SUCCESS],则下游直接提交事务并更改消息状态为 [CONSUME_PROCESSED],整个事务流程就结束了,这也是符合我们预期的。

5. 全文总结

分布式事务是分布式领域一个较为棘手的难题,在几年前,各种分布式事务解决方案作为大厂的技术壁垒是不会轻易对外公布的。

随着近年来开源社区的活跃,我们得以对各种解决方案一探究竟。

本文中,笔者从理论入手,辅助以代码讲解,对本地消息表方案做了一次较为深入的讲解及实现,希望本文的方案及实现思路能够对读者理解并实现分布式事务解决方案有所启发。

如果你有什么疑问和建议,也欢迎在评论区进行留言,在此先行谢过。

笔者将这个本地消息表分布式事务框架命名为 shieldTXC,意思是神盾分布式事务框架,框架内核及 Demo 案例的代码已经打包上传至 GitHub 上,地址为:。如果觉得这个喜欢可以点个 star 支持下。

2021-04-30-Y6qM1l
2021-04-30-Vw0ms9

2021-04-30-yamMZo
2021-04-30-S2CDkQ
2021-04-30-WYYCDe
2021-04-30-ggVTl8
手写实现基于消息队列的分布式事务框架
shieldTXC 源码地址
RocketMQ 中文开发者中心