设为首页 加入收藏

TOP

Spring 多线程的事务处理(六)
2023-09-23 15:44:13 】 浏览:178
Tags:Spring 程的事
ze(), executor.getKeepAliveTime(TimeUnit.SECONDS), TimeUnit.SECONDS, ReflectTool.createInstance(executor.getQueue().getClass()), executor.getThreadFactory(), ReflectTool.createInstance(executor.getRejectedExecutionHandler().getClass()) ); this.txManager = txManager; this.txStatus = txManager.getTransaction(definition); this.txResource = TransactionSynchronizationManager.getResource(txManager.getResourceFactory()); } public DataSourceTransactionExecutor(DataSourceTransactionManager txManager) { this(txManager, new DefaultTransactionDefinition()); } public void addTask(Runnable task) { callableList.add(DataSourceTransactionTask.Builder.aTask() .runnable(task).txManager(txManager) .resource(txResource).definition(new DefaultTransactionDefinition()) .build() ); } public void addTask(Runnable task, TransactionDefinition def) { callableList.add(DataSourceTransactionTask.Builder.aTask() .runnable(task).txManager(txManager) .resource(txResource).definition(def) .build() ); } public void execute() throws InterruptedException { List<Future<TransactionStatus>> futures = new ArrayList<>(); for (Callable<TransactionStatus> callable : callableList) { futures.add(executor.submit(callable)); } executor.shutdown(); List<TransactionStatus> statusList = new ArrayList<>(); for (Future<TransactionStatus> future : futures) { try { statusList.add(future.get()); } catch (ExecutionException e) { log.error("任务执行出现异常", e); statusList.add(null); } } Object[] statusArgs = new Object[statusList.size()]; statusList.toArray(statusArgs); mergeTaskResult(statusArgs); // 合并每个任务的事务信息 } /** * 以 Reactor 异步的方式执行这些任务,需要注意的是,当使用这个方法时,由于 * Reactor 的异步特性,如果业务方法使用了 @Transactional 注解修饰,Spring 的事务处理会发生在实际处理 * 事务之前,可能会导致数据库连接被释放,从而无法绑定对应的事务对象,使用时需要注意这一点 */ public void asyncExecute() { List<Mono<TransactionStatus>> monoList = new ArrayList<>(); Scheduler scheduler = Schedulers.fromExecutor(this.executor); for (Callable<TransactionStatus> callable : callableList) { monoList.add(Mono.fromCallable(callable) .subscribeOn(scheduler)); } Flux.zip(monoList, Tuples::fromArray) .single() .flatMap(tuple2 -> Mono.fromRunnable(() -> { TransactionSynchronizationManager.bindResource(txManager.getResourceFactory(), txResource); mergeTaskResult(tuple2.toArray()); })) .subscribeOn(scheduler) .doOnSubscribe(any -> log.info("开始执行事务的合并操作")) .doFinally(any -> { log.debug("合并事务处理执行完成"); scheduler.dispose(); executor.shutdown(); }) .subscribe(); } private void mergeTaskResult(Object... statusList) { boolean exFlag = false; for (Object obj : statusList) { if (obj == null) { exFlag = true; continue; } // 在当前上下文中一定是 TransactionStatus 类型的对象 TransactionStatus status = (TransactionStatus) obj; if (status.isRollbackOnly()) exFlag = true; } if (exFlag) { log.debug("由于任务执行时出现异常,因此会将整个业务进操作进行回滚"); txManager.rollback(txStatus); /* 这里抛出异常的原因是因为相关的业务方法可能被 @Transactional 修饰过, 从而导致提交只能回滚的事务而导致的提交异常,具体使用时可以考虑替换掉这个异常类型 */ throw new RuntimeException("需要回滚的异常"); } else { txManager.commit(txStatus); } } }
首页 上一页 3 4 5 6 下一页 尾页 6/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇支持JDK19虚拟线程的web框架,之.. 下一篇Redis面试题

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目