首页 >Java >尝试同时启动两个或多个 Spring Batch 作业时,会抛出错误:ORA-08177: 无法序列化此事务的访问

尝试同时启动两个或多个 Spring Batch 作业时,会抛出错误:ORA-08177: 无法序列化此事务的访问

王林
王林转载
2024-02-09 17:00:10628浏览

php小编西瓜在使用Spring Batch时,可能会遇到一个问题:当尝试同时启动两个或多个Spring Batch作业时,会抛出错误:ORA-08177: 无法序列化此事务的访问。这个错误可能让人困惑,但实际上它是由于数据库锁定问题导致的。在解决这个问题之前,我们需要了解一些关于Spring Batch和数据库事务的背景知识。

问题内容

尝试使用 completablefuture 并行运行两个 spring batch 作业时遇到错误。错误信息如下:

originalsql = insert into batch_job_instance(job_instance_id, job_name, job_key, version)
    values (?, ?, ?, ?)
, error msg = ora-08177: can't serialize access for this transaction

我正在使用 spring batch 5.x、spring boot 3.1.6、jdk 17

应用程序属性

spring.batch.repository.isolationlevelforcreate=isolation_read_committed
spring.batch.isolationlevel=read_committed
spring.batch.jdbc.table-prefix=batch_
spring.batch.job.enabled=false

batchconfig.java

@configuration
public class batchconfig {

@bean("simpletaskexecutor")
    public taskexecutor simpletaskexecutor() {
        simpleasynctaskexecutor asynctaskexecutor = new simpleasynctaskexecutor("simpletaskexecutor");
        asynctaskexecutor.setconcurrencylimit(concurrencycount);
        return asynctaskexecutor;
    }


@bean("joblauncherasync")
    @scope("prototype")
    public joblauncher joblauncherasync(datasource datasource, jobrepository jobrepository) throws exception {

        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
    
    @bean
    public jpatransactionmanager transactionmanager(entitymanagerfactory entitymanagerfactory) {
        return new jpatransactionmanager(entitymanagerfactory);
    }
    
    @bean
    public batchjobexecutionlistener batchjobexecutionlistener() {
        return new batchjobexecutionlistener();
    } 
    
    @bean
    public batchjobstepexecutionlistner batchjobstepexecutionlistner() {
        return new batchjobstepexecutionlistner();
    }

}

employeejobconfig.java

@configuration
@import(batchconfig.class)
public class employeejobconfig{
    
        
    @autowired
    employeestatuswritter employeestatuswritter;
    
    @autowired
    employeependingprocessor employeependingprocessor;
    
    
    
    @bean("employeependingreader")
    public jpapagingitemreader<employee> employeependingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<employee> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select e from employee e");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("employeespinvokestep")
    public step employeespinvokestep(@qualifier("employeependingreader") itemreader<employee> reader,
            @qualifier("employeestatuswritter") itemwriter<employee> writer,
            @qualifier("employeependingprocessor") itemprocessor<employee, employee> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("employeespinvokestep",jobrepository)
                .<employee, employee>chunk(50,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("employeespjob")
    public job employeespjob(@qualifier("employeespinvokestep") step employeespinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("employeespjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(employeespinvokestep)
                .build();
    }

}

managerconfig.java

@configuration
@import(batchconfig.class)
public class managerconfig {
    
        

    @autowired
    managerstatuswritter managerstatuswritter;
    
    
    @autowired
    managerpendingprocessor managerpendingprocessor;
    
    @bean("managerpendingreader")
    public jpapagingitemreader<manager> managerpendingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<manager> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select m from manager m");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("managerspinvokestep")
    public step indvinvoiceconsctlspinvokestep(@qualifier("managerpendingreader") itemreader<manager> reader,
            @qualifier("managerstatuswritter") itemwriter<manager> writer,
            @qualifier("managerpendingprocessor") itemprocessor<manager, manager> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("managerspinvokestep",jobrepository)
                .<manager, manager>chunk(5000,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("managerspjob")
    public job managerspjob(@qualifier("managerspinvokestep") step indvinvoiceconsctlspinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("managerspjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(indvinvoiceconsctlspinvokestep)
                .build();
    }
    
}

batchjobmanager.java

@service
public class batchjobmanager {

    
    
      @autowired applicationcontext context;
     
      @autowired batchexecutorservice batchexecutorservice;
      
      @autowired
      batchjobrunner batchjobrunner;
      
       
    
    public void startjob() {
        
         try {
               system.out.println("batchjobmanager called .. "+new date());
                
               string[] invoicenames={"employeespjob","managerspjob"};
               
               list<string> invoicenameslist = arrays.aslist(invoicenames);
               launchasyn(getbatchjoblist(invoicenameslist));
                
                 
         } catch (exception e) {
             system.out.println("while loading job..");
             e.printstacktrace();
         }
        
    }
      

    public list<batchjob>  getbatchjoblist(list<string> jobnames) throws exception{
        list<batchjob> batchjoblist=new arraylist<batchjob>();
        for(string job:jobnames) {
             batchjob batchjob= batchjob.builder().jobname(invoicejob).build();
             batchjoblist.add(batchjob);
        }
        return batchjoblist;
    }
      
    
       
    public void launchasyn( list<batchjob> batchjoblist) throws exception{
         list<completablefuture<batchjob>> batchjobfuturelist = new arraylist<completablefuture<batchjob>>();
          
          for(batchjob batchjob:batchjoblist) {
          completablefuture<batchjob> jobfuture = batchexecutorservice.execute(batchjob, asynctaskexecutor);
          batchjobfuturelist.add(jobfuture);
          }
          
          completablefuture<void> jobfutureresult = completablefuture
                    .allof(batchjobfuturelist.toarray(new completablefuture[batchjobfuturelist.size()]));
          
          
          completablefuture<list<canbatchjob>> allcompletablefuture = jobfutureresult.thenapply(future -> {
                return batchjobfuturelist.stream().map(completablefuture -> completablefuture.join())
                        .collect(collectors.tolist());
            });
        
          list<batchjob> resultfuturelist=allcompletablefuture.get();
          
          for(batchjob batch:resultfuturelist) {
              system.out.println("status "+batch.getiscompleted());
          }
    }
   
    
}

batchexecutorservice.java

@service
public class batchexecutorservice {
    
        
    @autowired 
    batchjobrunner batchjobrunner;
        
    public completablefuture<canbatchjob> execute(canbatchjob canbatchjob,taskexecutor threadpooltaskexecutor){
        return completablefuture.supplyasync(() -> batchjobrunner.execute(canbatchjob),threadpooltaskexecutor);
    }
    
    
}

batchjobrunner.java

@service
public class batchjobrunner {

    @autowired applicationcontext context;
      
    
     @autowired 
     @qualifier("joblauncherasync")
     joblauncher joblauncherasync;
    
    
    /*
     * @autowired joblauncher joblauncherasync;
     */
    
    public batchjob execute(batchjob batchjob) {
        try {
            system.out.println(" batchjob"+batchjob.getjobname()+" called ...");
            joblauncherasync.run(getjob(batchjob.getjobname()), getjobparameters(batchjob.getjobname()));
            thread.sleep(15000);
            batchjob.setiscompleted(true);
            system.out.println(" batchjob"+batchjob.getjobname()+" completed ...");
        }
        catch(exception e) {
            system.out.println("exception "+e.getmessage());
            batchjob.seterrordesc(e.getmessage().tostring());
            e.printstacktrace();
        }
        
        return canbatchjob;
    }
    
    public job getjob(string jobname) {
        
         return  (job) context.getbean(jobname); 
    }
    
    public jobparameters getjobparameters(string jobname) {
    
    jobparameters  jobparameters = new jobparametersbuilder() .addstring("unique_id",
              uuid.randomuuid().tostring(), true) .addstring("job_name", jobname,
              true) .adddate("execution_start_date", date.from(instant.now()),
              true).tojobparameters();
    return jobparameters;
    }
    
}

batchjob.java

public class BatchJob {

    private String jobName;
    private Boolean isCompleted;
    private String errorDesc;
    
    
}

作业在逐一或按顺序执行时成功运行。但是,在使用 completablefuture 时,遇到了问题。同时启动 spring 批处理作业是正确的方法吗?

解决方法

1.将以下属性添加到application.properties

spring.main.allow-bean-definition-overriding=true

2.更新了 joblauncher(),如下 batchconfig.java 中所示

@bean("joblauncher")
    public joblauncher joblauncher(datasource datasource, jobrepository jobrepository) throws exception {
        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
  1. 删除了 completablefuture。

  2. 删除了 batchexecutorservice.java

  3. 在 batchjobmanager.java 中添加了以下方法来调用作业。

    public void launchSync(List<BatchJob> batchJobList) throws Exception {
             for(BatchJob batchJob:batchJobList) {
                  batchJobRunner.execute(batchJob);
                  }  
         }

以上是尝试同时启动两个或多个 Spring Batch 作业时,会抛出错误:ORA-08177: 无法序列化此事务的访问的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文转载于:stackoverflow.com。如有侵权,请联系admin@php.cn删除

相关文章

查看更多