Seata源码—4.全局事务拦截与开启事务处理

大纲

1.Seata Server的启动入口的源码

2.Seata Server的网络服务器启动的源码

3.全局事务拦截器的核心变量

4.全局事务拦截器的初始化源码

5.全局事务拦截器的AOP切面拦截方法

6.通过全局事务执行模版来执行全局事务

7.获取xid构建全局事务实例与全局事务的传播级别

8.全局事务执行模版根据传播级别来执行业务

9.全局事务执行模版开启事务+提交事务+回滚事务

10.Seata Server集群的负载均衡机制实现源码

11.Seata Client向Seata Server发送请求的源码

12.Client将RpcMessage对象编码成字节数组

13.Server将字节数组解码成RpcMessage对象

14.Server处理已解码的RpcMessage对象的流程

15.Seata Server开启全局事务的流程源码

 

1.Seata Server的启动入口的源码

代码位于seata-server模块下:

@SpringBootApplication(scanBasePackages = {"io.seata"})
public class ServerApplication {
    public static void main(String[] args) throws IOException {
        //run the spring-boot application
        SpringApplication.run(ServerApplication.class, args);
    }
}

@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);
    private boolean started = Boolean.FALSE;
    private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();

    public static void addDisposable(Disposable disposable) {
        DISPOSABLE_LIST.add(disposable);
    }

    @Override
    public void run(String... args) {
        try {
            long start = System.currentTimeMillis();
            Server.start(args);
            started = true;

            long cost = System.currentTimeMillis() - start;
            LOGGER.info("seata server started in {} millSeconds", cost);
        } catch (Throwable e) {
            started = Boolean.FALSE;
            LOGGER.error("seata server start error: {} ", e.getMessage(), e);
            System.exit(-1);
        }
    }

    public boolean started() {
        return started;
    }

    @Override
    public void destroy() throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("destoryAll starting");
        }
        for (Disposable disposable : DISPOSABLE_LIST) {
            disposable.destroy();
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("destoryAll finish");
        }
    }
}

public class Server {
    //The entry point of application.
    public static void start(String[] args) {
        //create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);

        //initialize the parameter parser
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        ParameterParser parameterParser = new ParameterParser(args);

        //initialize the metrics
        //Seata Server是支持metric指标采集功能的
        MetricsManager.get().init();

        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

        //Seata Server里的Netty服务器的IO线程池,最小50个,最大500个
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
            NettyServerConfig.getMinServerPoolSize(),
            NettyServerConfig.getMaxServerPoolSize(),
            NettyServerConfig.getKeepAliveTime(),
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        //创建一个Netty网络通信服务器
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

        UUIDGenerator.init(parameterParser.getServerNode());
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());
       
        //启动定时调度线程
        DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
        coordinator.init();
        nettyRemotingServer.setHandler(coordinator);

        //let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
        ServerRunner.addDisposable(coordinator);

        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
            if (StringUtils.isNotBlank(preferredNetworks)) {
                XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
            } else {
                XID.setIpAddress(NetUtil.getLocalIp());
            }
        }
       
        //初始化Netty服务器
        nettyRemotingServer.init();
    }
}

 

2.Seata Server的网络服务器启动的源码

创建和启动Seata的网络服务器:

public class NettyRemotingServer extends AbstractNettyRemotingServer {
    ...
    //Instantiates a new Rpc remoting server. 创建Seata Server
    public NettyRemotingServer(ThreadPoolExecutor messageExecutor) {
        super(messageExecutor, new NettyServerConfig());
    }
    
    //启动Seata Server
    @Override
    public void init() {
        //registry processor
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }
    
    private void registerProcessor() {
        //1.registry on request message processor
        ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
        ShutdownHook.getInstance().addDisposable(onRequestProcessor);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        //2.registry on response message processor
        ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
        //3.registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        //4.registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        //5.registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }
    ...
}

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    private final NettyServerBootstrap serverBootstrap;
    ...
    public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
        super(messageExecutor);
        //创建Netty Server
        serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
        serverBootstrap.setChannelHandlers(new ServerHandler());
    }
    
    @Override
    public void init() {
        super.init();
        //启动Netty Server
        serverBootstrap.start();
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    //The Timer executor. 由单个线程进行调度的线程池
    protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));
    //The Message executor.
    protected final ThreadPoolExecutor messageExecutor;
    ...
    
    public void init() {
        //启动一个定时任务,每隔3秒检查发送的请求是否响应超时
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                    MessageFuture future = entry.getValue();
                    if (future.isTimeout()) {
                        futures.remove(entry.getKey());
                        RpcMessage rpcMessage = future.getRequestMessage();
                        future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                        }
                    }
                }
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
    }
    ...
}

public class NettyServerBootstrap implements RemotingBootstrap {
    private final NettyServerConfig nettyServerConfig;
    private final EventLoopGroup eventLoopGroupBoss;
    private final EventLoopGroup eventLoopGroupWorker;
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
    private ChannelHandler[] channelHandlers;
    private int listenPort;
    private final AtomicBoolean initialized = new AtomicBoolean(false);

    public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {
        this.nettyServerConfig = nettyServerConfig;
        if (NettyServerConfig.enableEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));
        }
    }
    
    //Sets channel handlers.
    protected void setChannelHandlers(final ChannelHandler... handlers) {
        if (handlers != null) {
            channelHandlers = handlers;
        }
    }
    
    @Override
    public void start() {
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            }
        );

        try {
            this.serverBootstrap.bind(getListenPort()).sync();
            XID.setPort(getListenPort());
            LOGGER.info("Server started, service listen port: {}", getListenPort());
            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
            initialized.set(true);
        } catch (SocketException se) {
            throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
        } catch (Exception exx) {
            throw new RuntimeException("Server start failed", exx);
        }
    }
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    private RemotingServer remotingServer;
    private final DefaultCore core;
    private static volatile DefaultCoordinator instance;
    private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1));
    private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1));
    private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1));
    private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1));
    private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1));
    ...
    
    public static DefaultCoordinator getInstance(RemotingServer remotingServer) {
        if (null == instance) {
            synchronized (DefaultCoordinator.class) {
                if (null == instance) {
                    instance = new DefaultCoordinator(remotingServer);
                }
            }
        }
        return instance;
    }
    
    private DefaultCoordinator(RemotingServer remotingServer) {
        if (remotingServer == null) {
            throw new IllegalArgumentException("RemotingServer not allowed be null.");
        }
        this.remotingServer = remotingServer;
        this.core = new DefaultCore(remotingServer);
    }
    
    public void init() {
        retryRollbacking.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 
            0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        retryCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 
            0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 
            0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        timeoutCheck.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 
            0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        undoLogDelete.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
            UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }
    ...
}

Seata Client的ClientHandler和Seata Server的ServerHandler:

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {
        //Channel read.
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //此时会把解码完毕的RpcMessage来进行处理
            processMessage(ctx, (RpcMessage) msg);
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            synchronized (lock) {
                if (ctx.channel().isWritable()) {
                    lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        //Channel inactive.
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            debugLog("inactive:{}", ctx);
            if (messageExecutor.isShutdown()) {
                return;
            }
            handleDisconnect(ctx);
            super.channelInactive(ctx);
        }

        private void handleDisconnect(ChannelHandlerContext ctx) {
            final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
            RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ipAndPort + " to server channel inactive.");
            }
            if (rpcContext != null && rpcContext.getClientRole() != null) {
                rpcContext.release();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
                }
            } else {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove unused channel:" + ctx.channel());
                }
            }
        }

        //Exception caught.
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            try {
                if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {
                    return;
                }
                LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel());
                super.exceptionCaught(ctx, cause);
            } finally {
                ChannelManager.releaseRpcContext(ctx.channel());
            }
        }

        //User event triggered.
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                debugLog("idle:{}", evt);
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel:" + ctx.channel() + " read idle.");
                    }
                    handleDisconnect(ctx);
                    try {
                        closeChannelHandlerContext(ctx);
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            }
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }
    }
    ...
}

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...
    @Sharable
    class ClientHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            processMessage(ctx, (RpcMessage) msg);
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            synchronized (lock) {
                if (ctx.channel().isWritable()) {
                    lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (messageExecutor.isShutdown()) {
                return;
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("channel inactive: {}", ctx.channel());
            }
            clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
            super.channelInactive(ctx);
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel {} read idle.", ctx.channel());
                    }
                    try {
                        String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
                        clientChannelManager.invalidateObject(serverAddress, ctx.channel());
                    } catch (Exception exx) {
                        LOGGER.error(exx.getMessage());
                    } finally {
                        clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));
                    }
                }
                if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                    try {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("will send ping msg,channel {}", ctx.channel());
                        }
                        AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
                    } catch (Throwable throwable) {
                        LOGGER.error("send request error: {}", throwable.getMessage(), throwable);
                    }
                }
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);
            clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove exception rm channel:{}", ctx.channel());
            }
            super.exceptionCaught(ctx, cause);
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }
    }
    ...
}

 

3.全局事务拦截器的核心变量

全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,如果发现Spring的Bean含有Seata的注解,就会为该Bean创建动态代理。

 

比如Spring的Bean添加了@GlobalTransactional注解,那么GlobalTransactionScanner类为这个Bean创建动态代理时,会使用全局事务拦截器GlobalTransactionalInterceptor来进行创建。

 

这样后续调用到这个Spring Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。

 

GlobalTransactionalInterceptor这个全局事务注解拦截器的核心变量如下:

一.TransactionalTemplate全局事务执行模版

二.GlobalLockTemplate全局锁管理模版

三.FailureHandler全局事务异常处理器

//全局事务注解拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);

    //默认的全局事务异常处理组件
    //如果全局事务出现开启、回滚、提交、重试异常时,就可以回调这个DefaultFailureHandlerImpl进行异常处理
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
    
    //全局事务执行模版,用来管理全局事务的执行
    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    
    //全局锁执行模版,用来实现不同全局事务间的写隔离
    private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();

    //真正的全局事务异常处理组件
    private final FailureHandler failureHandler;

    //是否禁用全局事务
    private volatile boolean disable;

    //全局事务拦截器的顺序
    private int order;

    //AOP切面全局事务核心配置,来自于全局事务注解
    protected AspectTransactional aspectTransactional;

    //全局事务降级检查的时间周期
    private static int degradeCheckPeriod;

    //是否开启全局事务的降级检查
    private static volatile boolean degradeCheck;

    //降级检查允许时间
    private static int degradeCheckAllowTimes;

    //降级次数
    private static volatile Integer degradeNum = 0;
    
    //reach达标次数
    private static volatile Integer reachNum = 0;

    //Guava提供的事件总线
    private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);

    //定时调度线程池
    private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
    
    //默认的全局事务超时时间
    private static int defaultGlobalTransactionTimeout = 0;
    
    ...    
}

 

4.全局事务拦截器的初始化源码

全局事务拦截器GlobalTransactionalInterceptor进行初始化时,会设置全局事务的异常处理组件,设置默认的全局事务超时时间为60秒。

//全局事务注解扫描器
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
    ...    
    //Spring AOP里对方法进行拦截的拦截器
    private MethodInterceptor interceptor;
    
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (!doCheckers(bean, beanName)) {
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;

                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //init tcc fence clean task if enable useTccFence
                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
                } else {
                    //获取目标class的接口
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解
                    if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (globalTransactionalInterceptor == null) {
                        //创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理
                    //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理
                    //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    int pos;
                    for (Advisor avr : advisor) {
                        // Find the position based on the advisor's order, and add to advisors by pos
                        pos = findAddSeataAdvisorPosition(advised, avr);
                        advised.addAdvisor(pos, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
    ...
}

//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
    //真正的全局事务异常处理组件
    private final FailureHandler failureHandler;
    //是否禁用全局事务
    private volatile boolean disable;
    //全局事务拦截器的顺序
    private int order;
    //是否开启全局事务的降级检查
    private static volatile boolean degradeCheck;
    //全局事务降级检查的时间周期
    private static int degradeCheckPeriod;
    //降级检查允许时间
    private static int degradeCheckAllowTimes;
    //默认的全局事务超时时间
    private static int defaultGlobalTransactionTimeout = 0;
    //Guava提供的事件总线
    private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
    ...
    
    //Instantiates a new Global transactional interceptor.
    //实例化一个新的全局事务拦截器
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
        this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
        this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
        degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);
        if (degradeCheck) {
            ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
            degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
            degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
            EVENT_BUS.register(this);
            if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
                startDegradeCheck();
            }
        }
        this.initDefaultGlobalTransactionTimeout();
    }
    
    //初始化默认的全局事务超时时间,60s=1min
    private void initDefaultGlobalTransactionTimeout() {
        if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {
            int defaultGlobalTransactionTimeout;
            try {
                defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
            } catch (Exception e) {
                LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
                defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
            }
            if (defaultGlobalTransactionTimeout <= 0) {
                LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
                defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
            }
            GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
        }
    }
    ...
}

 

5.全局事务拦截器的AOP切面拦截方法

如果调用添加了@GlobalTransactional注解的方法,就会执行GlobalTransactionalInterceptor的invoke()方法。

//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
    //是否禁用全局事务
    private volatile boolean disable;    
    //是否开启全局事务的降级检查
    private static volatile boolean degradeCheck;
    //降级次数
    private static volatile Integer degradeNum = 0;
    //降级检查允许时间
    private static int degradeCheckAllowTimes;
    //AOP切面全局事务核心配置,来自于全局事务注解
    protected AspectTransactional aspectTransactional;
    ...
    
    //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        //methodInvocation是一次方法调用
        //通过methodInvocation的getThis()方法可以获取到被调用方法的对象
        //通过AopUtils.getTargetClass()方法可以获取到对象对应的Class
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        //通过反射,获取到目标class中被调用的method方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

        //如果调用的目标method不为null
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            //尝试寻找桥接方法bridgeMethod
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //通过反射,获取被调用的目标方法的@GlobalTransactional注解
            final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
            //通过反射,获取被调用目标方法的@GlobalLock注解
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);

            //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true
            //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            //如果全局事务没有禁用
            if (!localDisable) {
                //全局事务注解不为空,或者是AOP切面全局事务核心配置不为空
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        //创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来
                        transactional = new AspectTransactional(
                            globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(),
                            globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes()
                        );
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    //真正处理全局事务的入口
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }

        //直接运行目标方法
        return methodInvocation.proceed();
    }
    
    //获取注解
    public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {
        return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));
    }
    ...
}

 

6.通过全局事务执行模版来执行全局事务

GlobalTransactionInterceptor全局事务拦截器中会有一个全局事务执行模版的实例变量,这个全局事务执行模版TransactionalTemplate实例就是用来执行全局事务的。执行全局事务时,就会调用TransactionalTemplate的execute()方法。

//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
    //全局事务执行模版,用来管理全局事务的执行
    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    ...
    //真正进行全局事务的处理
    Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {
        boolean succeed = true;
        try {
            //基于全局事务执行模版TransactionalTemplate,来执行全局事务
            return transactionalTemplate.execute(new TransactionalExecutor() {
                //真正执行目标方法
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
                
                //根据全局事务注解可以获取到一个name,可以对目标方法进行格式化
                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
                
                //获取全局事务的信息
                @Override
                public TransactionInfo getTransactionInfo() {
                    //reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    //封装一个全局事务信息实例TransactionInfo
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);//全局事务超时时间
                    transactionInfo.setName(name());//全局事务名称
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());//全局事务传播级别
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//全局锁获取重试间隔
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//全局锁重试次数
                    //全局事务回滚规则
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            ...
        } finally {
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }
    ...
}

 

7.获取xid构建全局事务实例与全局事务的传播级别

(1)从RootContext获取xid来构建全局事务实例

(2)全局事务的传播级别

 

(1)从RootContext获取xid来构建全局事务实例

RootContext会通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore实例、ThreadLocalContextCore实例。

 

而xid又会通过RootContext的bind()方法被put()到ContextCore实例中,也就是xid会被put()到ThreadLocal<Map<String, Object>>中,或者被put()到FastThreadLocal<Map<String, Object>>中。因此,通过RootContext的get()方法可以从ContextCore实例中获取当前线程的xid。

//全局事务执行模版
public class TransactionalTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
    
    //Execute object.
    public Object execute(TransactionalExecutor business) throws Throwable {
        //1.Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }

        //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();
      
        //1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        ...
    }
    ...
}

//全局事务上下文
public class GlobalTransactionContext {
    private GlobalTransactionContext() {
    }
    
    //Get GlobalTransaction instance bind on current thread.
    public static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }
    ...
}

public class RootContext {
    //通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore、ThreadLocalContextCore
    //所以可以认为,xid是存放在ThreadLocal<Map<String, Object>>中的
    private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    ...
    private RootContext() {
    }
    
    //Gets xid.
    @Nullable
    public static String getXID() {
        return (String) CONTEXT_HOLDER.get(KEY_XID);
    }
    
    //Bind xid.
    public static void bind(@Nonnull String xid) {
        if (StringUtils.isBlank(xid)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("xid is blank, switch to unbind operation!");
            }
            unbind();
        } else {
            MDC.put(MDC_KEY_XID, xid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind {}", xid);
            }
            CONTEXT_HOLDER.put(KEY_XID, xid);
        }
    }
    ...
}

(2)全局事务的传播级别

全局事务的传播级别分别有:REQUIRED、REQUIRES_NEW、NOT_SUPPORTED、NEVER、SUPPORTS、MANDATORY。

//Propagation level of global transactions.
//全局事务的传播级别
public enum Propagation {
    //如果全局事务已经存在,此时会直接在当前的全局事务里继续去运行下去,后续运行的都是全局事务里的分支事务
    //如果全局事务此时还不存在,就会开启一个新的全局事务来运行
    //这种全局事务传播级别,就是REQUIRED
    //The logic is similar to the following code:
    //     if (tx == null) {
    //         try {
    //             tx = beginNewTransaction(); // begin new transaction, is not existing
    //             Object rs = business.execute(); // execute with new transaction
    //             commitTransaction(tx);
    //             return rs;
    //         } catch (Exception ex) {
    //             rollbackTransaction(tx);
    //             throw ex;
    //         }
    //     } else {
    //         return business.execute(); // execute with current transaction
    //     }
    REQUIRED,

    //如果全局事务已经存在,则先暂停该事务,然后开启一个新的全局事务来执行业务
    //The logic is similar to the following code:
    //     try {
    //         if (tx != null) {
    //             suspendedResource = suspendTransaction(tx); // suspend current transaction
    //         }
    //         try {
    //             tx = beginNewTransaction(); // begin new transaction
    //             Object rs = business.execute(); // execute with new transaction
    //             commitTransaction(tx);
    //             return rs;
    //         } catch (Exception ex) {
    //             rollbackTransaction(tx);
    //             throw ex;
    //         }
    //     } finally {
    //         if (suspendedResource != null) {
    //             resumeTransaction(suspendedResource); // resume transaction
    //         }
    //     }
    REQUIRES_NEW,

    //如果全局事务已经存在,则先暂停该事务,然后不要使用全局事务来执行业务
    //The logic is similar to the following code:
    //     try {
    //         if (tx != null) {
    //             suspendedResource = suspendTransaction(tx); // suspend current transaction
    //         }
    //         return business.execute(); // execute without transaction
    //     } finally {
    //         if (suspendedResource != null) {
    //             resumeTransaction(suspendedResource); // resume transaction
    //         }
    //     }
    NOT_SUPPORTED,

    //如果全局事务不存在,则不要使用全局事务来执行业务
    //如果全局事务存在,则使用全局事务来执行业务
    //The logic is similar to the following code:
    //     if (tx != null) {
    //         return business.execute(); // execute with current transaction
    //     } else {
    //         return business.execute(); // execute without transaction
    //     }
    SUPPORTS,

    //如果全局事务存在,则抛异常
    //如果全局事务不存在,则执行业务
    //The logic is similar to the following code:
    //     if (tx != null) {
    //         throw new TransactionException("existing transaction");
    //     }
    //     return business.execute(); // execute without transaction
    NEVER,

    //如果全局事务不存在,则抛异常
    //如果全局事务存在,则使用全局事务去执行业务
    //The logic is similar to the following code:
    //     if (tx == null) {
    //         throw new TransactionException("not existing transaction");
    //     }
    //     return business.execute(); // execute with current transaction
    MANDATORY
}

 

8.全局事务执行模版根据传播级别来执行业务

//全局事务执行模版
public class TransactionalTemplate {
    ...
    //Execute object.
    //通过全局事务生命周期管理组件执行全局事务
    public Object execute(TransactionalExecutor business) throws Throwable {
        //1.Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }

        //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务
        //刚开始在开启一个全局事务的时候,是没有全局事务的
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        //1.2 Handle the transaction propagation.
        //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED
        //也就是如果存在一个全局事务,就直接执行业务;
        //如果不存在一个全局事务,就开启一个新的全局事务;
        Propagation propagation = txInfo.getPropagation();

        //不同的全局事务传播级别,会采取不同的处理方式
        //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid
        //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    //If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    //Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    //If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    //Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    //If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    //Continue and execute with new transaction
                    break;
                case REQUIRED:
                    //If current transaction is existing, execute with current transaction, else continue and execute with new transaction.
                    break;
                case NEVER:
                    //If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));
                    } else {
                        //Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    //If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    //Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                //如果xid为null,则会创建一个新的全局事务
                tx = GlobalTransactionContext.createNew();
            }

            //set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //else do nothing. Of course, the hooks will still be triggered.
                //开启一个全局事务
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    //Do Your Business
                    //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并执行一个一个分支事务
                    rs = business.execute();
                } catch (Throwable ex) {
                    //3. The needed business exception to rollback.
                    //发生异常时需要完成的事务
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                //4. everything is fine, commit.
                //如果一切执行正常就会在这里提交全局事务
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                //执行一些全局事务完成后的回调,比如清理等工作
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            //If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                //如果之前挂起了一个全局事务,此时可以恢复这个全局事务
                tx.resume(suspendedResourcesHolder);
            }
        }
    }
    ...
}

 

9.全局事务执行模版开启事务+提交事务+回滚事务

(1)事务执行模版的开启事务+提交事务+回滚事务

(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理

(1)事务执行模版的开启事务+提交事务+回滚事务

事务执行模版TransactionalTemplate在开启、提交、回滚事务时,会通过默认的全局事务DefaultGlobalTransaction来进行开启、提交、回滚事务。

//全局事务上下文
public class GlobalTransactionContext {
    private GlobalTransactionContext() {
    }
    
    //Try to create a new GlobalTransaction.
    //如果xid为null,则会创建一个新的全局事务
    public static GlobalTransaction createNew() {
        return new DefaultGlobalTransaction();
    }
    ...
}

//默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
    private TransactionManager transactionManager;
    private String xid;
    private GlobalStatus status;
    private GlobalTransactionRole role;
    ...    
    
    //Instantiates a new Default global transaction.
    DefaultGlobalTransaction() {
        //全局事务角色是全局事务发起者
        this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
    }
    
    //Instantiates a new Default global transaction.
    DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
        this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
        this.xid = xid;
        this.status = status;
        this.role = role;
    }
    ...
}

//全局事务执行模版
public class TransactionalTemplate {
    ...
    //开启事务
    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()
            triggerBeforeBegin();
            //真正去开启一个全局事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            //开启全局事务之后还有一个回调钩子名为triggerAfterBegin()
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
        }
    }
    
    private void triggerBeforeBegin() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
            }
        }
    }
    
    private void triggerAfterBegin() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e);
            }
        }
    }
    ...
    
    //提交事务
    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
        }
    }
    
    private void triggerBeforeCommit() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
            }
        }
    }
    
    private void triggerAfterCommit() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
            }
        }
    }
    
    private void triggerAfterCompletion() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCompletion();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
            }
        }
    }
    ...
    
    //回滚事务
    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                //Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            //not roll back on this exception, so commit
            commitTransaction(tx);
        }
    }
    
    private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        //3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }
    
    private void triggerBeforeRollback() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e);
            }
        }
    }
    
    private void triggerAfterRollback() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
            }
        }
    }
    ...
}

(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理

默认的全局事务DefaultGlobalTransaction在进行开启、提交、回滚事务时,会由默认的事务管理器DefaultTransactionManager来开启、提交、回滚事务。

 

而默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。

//默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
    private TransactionManager transactionManager;
    private String xid;
    private GlobalStatus status;
    private GlobalTransactionRole role;
    ... 
    
    @Override
    public void begin() throws TransactionException {
        begin(DEFAULT_GLOBAL_TX_TIMEOUT);
    }
    
    @Override
    public void begin(int timeout) throws TransactionException {
        begin(timeout, DEFAULT_GLOBAL_TX_NAME);
    }
    
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid);
        }
        //通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //把xid绑定到RootContext的线程本地变量副本里去
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
    
    @Override
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }
    
    @Override
    public void rollback() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    status = transactionManager.rollback(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }
    ...
}

public class RootContext {
    private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    public static final String KEY_XID = "TX_XID";
    ...
    
    //Gets xid.
    @Nullable
    public static String getXID() {
        return (String) CONTEXT_HOLDER.get(KEY_XID);
    }
    
    //Bind xid.
    public static void bind(@Nonnull String xid) {
        if (StringUtils.isBlank(xid)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("xid is blank, switch to unbind operation!");
            }
            unbind();
        } else {
            MDC.put(MDC_KEY_XID, xid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind {}", xid);
            }
            CONTEXT_HOLDER.put(KEY_XID, xid);
        }
    }
    ...
}

//默认的全局事务管理器
public class DefaultTransactionManager implements TransactionManager {
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        //构建一个全局事务开启请求GlobalBeginRequest
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        //发起一个同步调用
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }
    
    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }
    
    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }
    ...
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
}

//GlobalBeginRequest会根据Seata的通信协议序列化成字节数组,然后通过Netty被发送到Seata Server中去
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
    private int timeout = 60000;
    private String transactionName;
    ...
    
    @Override
    public short getTypeCode() {
        return MessageType.TYPE_GLOBAL_BEGIN;
    }
    
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
    ...
}

 

10.Seata Server集群的负载均衡机制实现源码

(1)通过负载均衡选择Seata Server节点

(2)Seata提供的负载均衡算法

 

(1)通过负载均衡选择Seata Server节点

默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。

 

syncCall()方法在调用TmNettyRemotingClient实例的sendSyncRequest()方法发送请求时,其实调用的是TmNettyRemotingClient的抽象父类AbstractNettyRemotingClient的sendSyncRequest()方法。

 

在sendSyncRequest()方法中,首先会调用AbstractNettyRemotingClient的loadBalance()方法进行负载均衡,也就是首先会调用AbstractNettyRemotingClient.doSelect()方法。

 

AbstractNettyRemotingClient的doSelect()方法会先通过LoadBalanceFactory工厂 + SPI来获取一个LoadBalance实例,然后再调用LoadBalance实例的select()方法来进行负载均衡。

 

负载均衡,其实就是从Seata Server节点中选择其中一个节点发送请求。

//默认的全局事务管理器
public class DefaultTransactionManager implements TransactionManager {
    ...
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
    ...
}

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...
    @Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {
        //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
        //获取RPC调用的超时时间
        long timeoutMillis = this.getRpcRequestTimeout();
        //构建一个RPC消息
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

        //send batch message
        //put message into basketMap, @see MergedSendRunnable
        //默认是不开启批量消息发送
        if (this.isEnableClientBatchSendRequest()) {
            ...
        } else {
            //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
            //然后通过网络连接Channel把RpcMessage发送给Seata Server
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }
    }
    
    protected String loadBalance(String transactionServiceGroup, Object msg) {
        InetSocketAddress address = null;
        try {
            @SuppressWarnings("unchecked")
            List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
            address = this.doSelect(inetSocketAddressList, msg);
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage());
        }
        if (address == null) {
            throw new FrameworkException(NoAvailableService);
        }
        return NetUtil.toStringAddress(address);
    }
    
    protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {
        if (CollectionUtils.isNotEmpty(list)) {
            if (list.size() > 1) {
                return LoadBalanceFactory.getInstance().select(list, getXid(msg));
            } else {
                return list.get(0);
            }
        }
        return null;
    }
    ...
}

public class LoadBalanceFactory {
    ...
    public static LoadBalance getInstance() {
        //根据SPI机制获取LoadBalance实例
        String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE);
        return EnhancedServiceLoader.load(LoadBalance.class, config);
    }
}

(2)Seata提供的负载均衡算法

轮询选择算法、随机选择算法、最少使用算法、一致性哈希算法。

 

一.轮询选择算法

@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE)
public class RoundRobinLoadBalance implements LoadBalance {
    private final AtomicInteger sequence = new AtomicInteger();
    
    @Override
    public <T> T select(List<T> invokers, String xid) {
        int length = invokers.size();
        //通过轮询选择Seata Server的节点
        return invokers.get(getPositiveSequence() % length);
    }
    
    private int getPositiveSequence() {
        for (;;) {
            int current = sequence.get();
            int next = current >= Integer.MAX_VALUE ? 0 : current + 1;
            if (sequence.compareAndSet(current, next)) {
                return current;
            }
        }
    }
}

二.随机选择算法

@LoadLevel(name = RANDOM_LOAD_BALANCE)
public class RandomLoadBalance implements LoadBalance {
    @Override
    public <T> T select(List<T> invokers, String xid) {
        int length = invokers.size();
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
}

三.最少使用算法

@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE)
public class LeastActiveLoadBalance implements LoadBalance {
    @Override
    public <T> T select(List<T> invokers, String xid) {
        int length = invokers.size();
        long leastActive = -1;
        int leastCount = 0;
        int[] leastIndexes = new int[length];
        for (int i = 0; i < length; i++) {
            long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive();
            if (leastActive == -1 || active < leastActive) {
                leastActive = active;
                leastCount = 1;
                leastIndexes[0] = i;
            } else if (active == leastActive) {
                leastIndexes[leastCount++] = i;
            }
        }
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
}

四.一致性哈希算法

@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE)
public class ConsistentHashLoadBalance implements LoadBalance {
    public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes";
    private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT);
    
    @Override
    public <T> T select(List<T> invokers, String xid) {
        //通过一致性哈希选择节点
        return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);
    }
    
    private static final class ConsistentHashSelector<T> {
        private final SortedMap<Long, T> virtualInvokers = new TreeMap<>();
        private final HashFunction hashFunction = new MD5Hash();
        
        ConsistentHashSelector(List<T> invokers, int virtualNodes) {
            for (T invoker : invokers) {
                for (int i = 0; i < virtualNodes; i++) {
                    virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker);
                }
            }
        }
       
        public T select(String objectKey) {
            SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey));
            Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey();
            return virtualInvokers.get(nodeHashVal);
        }
    }
    
    @SuppressWarnings("lgtm[java/weak-cryptographic-algorithm]")
    private static class MD5Hash implements HashFunction {
        MessageDigest instance;
        public MD5Hash() {
            try {
                instance = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        
        @Override
        public long hash(String key) {
            instance.reset();
            instance.update(key.getBytes());
            byte[] digest = instance.digest();
            long h = 0;
            for (int i = 0; i < 4; i++) {
                h <<= 8;
                h |= ((int) digest[i]) & 0xFF;
            }
            return h;
        }
    }
    
    public interface HashFunction {
        long hash(String key);
    }
}

 

11.Seata Client向Seata Server发送请求的源码

首先Seata Client会通过网络连接管理器ClientChannelManager获取与指定Seata Server建立的网络连接Channel。

 

然后通过Netty的Channel把RpcMessage请求消息发送给Seata Server,也就是执行Channel的writeAndFlush()方法将RpcMessage请求消息异步发送给Seata Server。

 

其中,Seata Client会将发送的请求消息封装在一个MessageFuture实例中。并且,Seata Client会通过MessageFuture同步等待Seata Server返回该请求的响应。而MessageFuture请求响应组件是通过CompletableFuture实现同步等待的。

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...
    @Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {
        //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
        //获取RPC调用的超时时间
        long timeoutMillis = this.getRpcRequestTimeout();
        //构建一个RPC消息
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

        //send batch message
        //put message into basketMap, @see MergedSendRunnable
        //默认是不开启批量消息发送
        if (this.isEnableClientBatchSendRequest()) {
            ...
        } else {
            //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
            //然后通过网络连接Channel把RpcMessage发送给Seata Server
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        if (timeoutMillis <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        if (channel == null) {
            LOGGER.warn("sendSync nothing, caused by null channel.");
            return null;
        }

        //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        channelWritableCheck(channel, rpcMessage.getBody());

        //获取远程地址
        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        doBeforeRpcHooks(remoteAddr, rpcMessage);

        //通过Netty的Channel异步化发送数据,同时对发送结果添加监听器
        //如果发送失败,则会对网络连接Channel进行销毁处理
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    messageFuture1.setResultMessage(future.cause());
                }
                destroyChannel(future.channel());
            }
        });

        try {
            //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
            Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            doAfterRpcHooks(remoteAddr, rpcMessage, result);
            return result;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    }
    ...
}

public class MessageFuture {
    private transient CompletableFuture<Object> origin = new CompletableFuture<>();
    ...
    public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
        Object result = null;
        try {
            result = origin.get(timeout, unit);
            if (result instanceof TimeoutException) {
                throw (TimeoutException)result;
            }
        } catch (ExecutionException e) {
            throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
        } catch (TimeoutException e) {
            throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
        }

        if (result instanceof RuntimeException) {
            throw (RuntimeException)result;
        } else if (result instanceof Throwable) {
            throw new RuntimeException((Throwable)result);
        }

        return result;
    }
    ...
}

 

12.Client将RpcMessage对象编码成字节数组

Seata Client在调用Channel的writeAndFlush()方法将RpcMessage对象发送给Seata Server时,会先将RpcMessage对象交给NettyClientBootstrap的ChannelPipeline进行处理。其中,RpcMessage对象会被ProtocolV1Encoder编码成字节数组。

public class NettyClientBootstrap implements RemotingBootstrap {
    ...
    @Override
    public void start() {
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));
        }

        //基于Netty API构建一个Bootstrap
        //设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了
        this.bootstrap.group(this.eventLoopGroupWorker)
            .channel(nettyClientConfig.getClientChannelClazz())
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());

        if (nettyClientConfig.enableNative()) {
            if (PlatformDependent.isOsx()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("client run on macOS");
                }
            } else {
                bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);
            }
        }

        //对Netty网络通信数据处理组件pipeline进行初始化
        bootstrap.handler(
            new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    //IdleStateHandler,空闲状态检查Handler
                    //如果有数据通过就记录一下时间
                    //如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理
                    pipeline.addLast(new IdleStateHandler(
                        nettyClientConfig.getChannelMaxReadIdleSeconds(),
                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                        nettyClientConfig.getChannelMaxAllIdleSeconds()
                    ))
                    .addLast(new ProtocolV1Decoder())//基于Seata通信协议的编码器
                    .addLast(new ProtocolV1Encoder());//基于Seata通信协议的解码器
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            }
        );

        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started");
        }
    }
    ...
}

public class ProtocolV1Encoder extends MessageToByteEncoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Encoder.class);
    
    @Override
    public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
        try {
            if (msg instanceof RpcMessage) {
                RpcMessage rpcMessage = (RpcMessage) msg;
             
                //完整的消息长度
                int fullLength = ProtocolConstants.V1_HEAD_LENGTH;

                //消息头的长度
                int headLength = ProtocolConstants.V1_HEAD_LENGTH;

                //获取消息类型
                byte messageType = rpcMessage.getMessageType();

                //先写入魔数MagicNumber,通过魔数代表一条消息的开始
                out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);

                //然后写入版本号
                out.writeByte(ProtocolConstants.VERSION);

                //full Length(4B) and head length(2B) will fix in the end.
                //接着标记写入index的位置:当前写入的字节数 + 6,就是标记的writerIndex
                //可以理解为直接让writerIndex跳过了6字节,这6个字节的内容先空出来不写
                //最后写完具体的消息后,再把这6个字节代表的消息长度和消息头长度补回来
                //空出来的6个字节 = 4个字节的消息长度 + 2个字节的消息头长度
                out.writerIndex(out.writerIndex() + 6);

                //此时消息长度和消息头长度,还没统计出来,所以先跳过6个字节
                //也就是从版本号之后的第6个字节开始写:消息类型、codec、compressor
                out.writeByte(messageType);
                out.writeByte(rpcMessage.getCodec());
                out.writeByte(rpcMessage.getCompressor());

                //接着写入4个字节的消息ID
                out.writeInt(rpcMessage.getId());

                //direct write head with zero-copy
                //获取消息头
                Map<String, String> headMap = rpcMessage.getHeadMap();
                if (headMap != null && !headMap.isEmpty()) {
                    //对消息头进行编码,把Map转换为字节数据写入到out里面,此时才是在写消息头
                    //写完消息头之后,便可以获取到消息头长度headLength了
                    int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
                    headLength += headMapBytesLength;
                    fullLength += headMapBytesLength;
                }

                byte[] bodyBytes = null;
                //根据消息类型对消息体进行序列化
                if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
                    //heartbeat has no body
                    //根据RpcMessage对象的codec属性通过SPI机制获取serializer序列化组件
                    Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
                    //通过serializer对消息体进行序列化
                    bodyBytes = serializer.serialize(rpcMessage.getBody());

                    //根据RpcMessage对象的compressor属性通过SPI机制获取compressor压缩组件
                    Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
                    //通过compressor对字节数组进行压缩
                    bodyBytes = compressor.compress(bodyBytes);
                    fullLength += bodyBytes.length;
                }

                if (bodyBytes != null) {
                    out.writeBytes(bodyBytes);
                }

                //fix fullLength and headLength
                int writeIndex = out.writerIndex();
                //skip magic code(2B) + version(1B)
                out.writerIndex(writeIndex - fullLength + 3);
                out.writeInt(fullLength);
                out.writeShort(headLength);
                out.writerIndex(writeIndex);
            } else {
                throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
            }
        } catch (Throwable e) {
            LOGGER.error("Encode request error!", e);
        }
    }
}

 

13.Server将字节数组解码成RpcMessage对象

Seata Server收到Seata Client发来的字节数组时,会先将字节数组交给NettyServerBootstrap的ChannelPipeline进行处理。其中,字节数组会被ProtocolV1Decoder解码成RpcMessage对象。

public class NettyServerBootstrap implements RemotingBootstrap {
    ...
    @Override
    public void start() {
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            }
        );
        try {
            this.serverBootstrap.bind(getListenPort()).sync();
            XID.setPort(getListenPort());
            LOGGER.info("Server started, service listen port: {}", getListenPort());
            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
            initialized.set(true);
        } catch (SocketException se) {
            throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
        } catch (Exception exx) {
            throw new RuntimeException("Server start failed", exx);
        }
    }
    ...
}

public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class);
    
    //为了解决粘包和拆包的问题,这里基于LengthFieldBasedFrameDecoder按照整帧来进行解码
    public ProtocolV1Decoder() {
        // default is 8M
        this(ProtocolConstants.MAX_FRAME_LENGTH);
    }
    
    public ProtocolV1Decoder(int maxFrameLength) {
        //最大的帧长度是8M,所以一个消息数据不能超过8M
        //开头是2个字节的魔数、1个字节的版本号、然后第4个字节开始是4个字节的FullLength
        super(maxFrameLength, 3, 4, -7, 0);
    }
    
    //每一个整帧解出来之后,就可以通过decode()方法,把字节数组转为RpcMessage对象
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        Object decoded;
        try {
            //调用decode()方法进行解帧
            decoded = super.decode(ctx, in);
            if (decoded instanceof ByteBuf) {
                ByteBuf frame = (ByteBuf)decoded;
                try {
                    return decodeFrame(frame);
                } finally {
                    frame.release();
                }
            }
        } catch (Exception exx) {
            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
            throw new DecodeException(exx);
        }
        return decoded;
    }
    
    public Object decodeFrame(ByteBuf frame) {
        //开头两个byte是魔数
        byte b0 = frame.readByte();
        byte b1 = frame.readByte();
        if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
            throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
        }

        //获取到version版本号
        byte version = frame.readByte();
        int fullLength = frame.readInt();
        short headLength = frame.readShort();
        byte messageType = frame.readByte();
        byte codecType = frame.readByte();
        byte compressorType = frame.readByte();
        int requestId = frame.readInt();

        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setCodec(codecType);
        rpcMessage.setId(requestId);
        rpcMessage.setCompressor(compressorType);
        rpcMessage.setMessageType(messageType);

        //direct read head with zero-copy
        int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
        if (headMapLength > 0) {
            Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
            rpcMessage.getHeadMap().putAll(map);
        }

        //read body
        if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
            rpcMessage.setBody(HeartbeatMessage.PING);
        } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
            rpcMessage.setBody(HeartbeatMessage.PONG);
        } else {
            int bodyLength = fullLength - headLength;
            if (bodyLength > 0) {
                byte[] bs = new byte[bodyLength];
                frame.readBytes(bs);
                //先获取到压缩组件,对消息体字节数组进行解压缩
                Compressor compressor = CompressorFactory.getCompressor(compressorType);
                bs = compressor.decompress(bs);
                //然后对解压缩完的数据,根据序列化类型进行反序列化,获取到消息体对象
                Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
                rpcMessage.setBody(serializer.deserialize(bs));
            }
        }
        return rpcMessage;
    }
}

 

14.Server处理已解码的RpcMessage对象的流程

Seata Server将收到的网络请求字节数组解码成RpcMessage对象后,便会将RpcMessage对象交给NettyServerBootstrap的ServerHandler进行处理,也就是交给ServerHandler的channelRead()方法进行处理。

 

ServerHandler的channelRead()方法会调用AbstractNettyRemoting的processMessage()方法,也就是调用ServerOnRequestProcessor的process()方法来实现对RpcMessage对象的处理。

 

在ServerOnRequestProcessor的process()方法的处理过程中,会调用TransactionMessageHandler的onRequest()方法处理RpcMessage对象。

 

由于Server.start()初始化NettyRemotingServer时,设置了TransactionMessageHandler为DefaultCoordinator,所以最终就会调用DefaultCoordinator的onRequest()方法来处理RpcMessage对象。

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {
        //Channel read.
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
            processMessage(ctx, (RpcMessage) msg);
        }
        ...
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
            //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
            //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            long idx = System.currentTimeMillis();
                            try {
                                String jstackFile = idx + ".log";
                                LOGGER.info("jstack command will dump to " + jstackFile);
                                Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                            } catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    ...
}

public class NettyRemotingServer extends AbstractNettyRemotingServer {
    ...
    private void registerProcessor() {
        //1. registry on request message processor
        ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
        ShutdownHook.getInstance().addDisposable(onRequestProcessor);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        //2. registry on response message processor
        ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
        //3. registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        //4. registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        //5. registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }
    ...
}

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @Override
    public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
        this.processorTable.put(messageType, pair);
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    //This container holds all processors.
    protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
    ...
}

public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
    private final TransactionMessageHandler transactionMessageHandler;
    ...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            } catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }
    
    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        //RpcContext线程本地变量副本
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
            } catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        //the batch send request message
        if (message instanceof MergedWarpMessage) {
            ...
        } else {
            //the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }
    ...
}

//Server端的全局事务处理逻辑组件
//其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、
//本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    ...
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        transactionRequest.setTCInboundHandler(this);
        return transactionRequest.handle(context);
    }
    ...
}

 

15.Seata Server开启全局事务的流程源码

注意:创建一个全局事务会话后,会通过slf4j的MDC把xid放入线程本地变量副本里。

//Server端的全局事务处理逻辑组件
//其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、
//本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    ...
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        transactionRequest.setTCInboundHandler(this);
        return transactionRequest.handle(context);
    }
    ...
}

public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
    ...
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
    ...
}

//The type Abstract tc inbound handler.
public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
    ...
    @Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    //开启全局事务
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
    }
    
    //Do global begin.
    protected abstract void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException;
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    private final DefaultCore core;
    ...
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
        //接下来才真正处理开启全局事务的业务逻辑
        //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去
        response.setXid(core.begin(
            rpcContext.getApplicationId(),//应用程序id
            rpcContext.getTransactionServiceGroup(),//事务服务分组
            request.getTransactionName(),//事务名称
            request.getTimeout())//超时时间
        );
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }
    ...
}

public class DefaultCore implements Core {
    ...
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        //创建一个全局事务会话
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
        //通过slf4j的MDC把xid放入线程本地变量副本里去
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
        //添加一个全局事务会话的生命周期监听器
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        //打开Session
        session.begin();

        //transaction start event,发布会话开启事件
        MetricsPublisher.postSessionDoingEvent(session, false);
       
        //返回全局事务会话的xid
        return session.getXid();
    }
    ...
}

public class GlobalSession implements SessionLifecycle, SessionStorable {
    ...
    public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
        GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
        return session;
    }
    
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
        //全局事务id是通过UUIDGenerator来生成的
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;
        this.lazyLoadBranch = lazyLoadBranch;
        if (!lazyLoadBranch) {
            this.branchSessions = new ArrayList<>();
        }
        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
        //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
        this.xid = XID.generateXID(transactionId);
    }
    ...
}

 

From:https://www.cnblogs.com/mjunz/p/18880714
东阳马生架构
100+评论
captcha