mport org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* 用于 {@link DataSourceTransactionManager} 事务管理对应的任务线程,这个类的存在是为了使得
* Spring 中的事务管理能够在多线程的环境下依旧能够有效地工作.
* 对于要要执行的任务,可以将其封装成此任务对象,该任务对象在执行时将会绑定与 {@link DataSourceTransactionManager#getResourceFactory()}
* 对应的 {@link TransactionSynchronizationManager#getResourceMap()} 中关联的事务对象,以使得要执行的任务包含在已有的事务中
* (至少能保证存在一种可行的方式能够得到父线程的所处事务上下文),从而使得当前待执行的任务能够被现有统领事务进行管理
*
* @see DataSourceTransactionManager
* @see TransactionSynchronizationManager
*@author lxh
*/
public class DataSourceTransactionTask
implements Callable<TransactionStatus> {
private final static Logger log = LoggerFactory.getLogger(DataSourceTransactionTask.class);
/*
与 TransactionSynchronizationManager.resources 关联的事务属性对象的 Value 值,
在当前上下文中,为了保存与原有事务的完整性,这里的 resource 存储的是 DataSourceTransactionObject
*/
private final Object resource;
// 当前 Spring 平台的事务管理对象
private final DataSourceTransactionManager txManager;
// 实际需要运行的任务
private final Runnable runnable;
// 与事务有关的描述信息
private final TransactionDefinition definition;
public DataSourceTransactionTask(Object resource,
DataSourceTransactionManager txManager,
Runnable runnable,
TransactionDefinition definition) {
this.resource = resource;
this.txManager = txManager;
this.runnable = runnable;
this.definition = definition;
}
@Override
public TransactionStatus call() {
// 通过源码分析可知,对于 JDBC 事务的处理,key 为对应的 DataSource 对象
Object key = txManager.getResourceFactory();
/*
resource 是在启动这个线程之前就已经被主线程开启的事务对象,
我们可以知道它实际上就是 DataSourceTransactionObject,我们将他绑定到
当前线程,即可使得当前线程能够感知到这个事务的存在
*/
TransactionSynchronizationManager.bindResource(key, resource);
TransactionStatus status = txManager.getTransaction(definition);
try {
runnable.run();
} catch (Throwable t) {
log.debug("任务执行出现异常", t);
status.setRollbackOnly(); // 出现了异常,需要将整个事务进行回滚
} finally {
// 移除与当前线程执行的关联关系,避免任务执行过程中的资源混乱
TransactionSynchronizationManager.unbindResource(key);
}
return status;
}
}
更进一步,我们可以使用线程池来进一步封装,从而避免自己手动创建线程或者其它的线程管理容器:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 用于需要执行并行任务的事务管理线程池,由于现有的 Spr