首頁 >Java >嘗試同時啟動兩個或多個 Spring Batch 作業時,會拋出錯誤:ORA-08177: 無法序列化此事務的訪問

嘗試同時啟動兩個或多個 Spring Batch 作業時,會拋出錯誤:ORA-08177: 無法序列化此事務的訪問

王林
王林轉載
2024-02-09 17:00:10710瀏覽

php小編西瓜在使用Spring Batch時,可能會遇到一個問題:當嘗試同時啟動兩個或多個Spring Batch作業時,會拋出錯誤:ORA-08177: 無法序列化此事務的訪問。這個錯誤可能會讓人感到困惑,但實際上它是由於資料庫鎖定問題導致的。在解決這個問題之前,我們需要先了解一些關於Spring Batch和資料庫事務的背景知識。

問題內容

嘗試使用 completablefuture 並行執行兩個 spring batch 作業時遇到錯誤。錯誤訊息如下:

originalsql = insert into batch_job_instance(job_instance_id, job_name, job_key, version)
    values (?, ?, ?, ?)
, error msg = ora-08177: can't serialize access for this transaction

我正在使用 spring batch 5.x、spring boot 3.1.6、jdk 17

應用程式屬性

spring.batch.repository.isolationlevelforcreate=isolation_read_committed
spring.batch.isolationlevel=read_committed
spring.batch.jdbc.table-prefix=batch_
spring.batch.job.enabled=false

batchconfig.java

@configuration
public class batchconfig {

@bean("simpletaskexecutor")
    public taskexecutor simpletaskexecutor() {
        simpleasynctaskexecutor asynctaskexecutor = new simpleasynctaskexecutor("simpletaskexecutor");
        asynctaskexecutor.setconcurrencylimit(concurrencycount);
        return asynctaskexecutor;
    }


@bean("joblauncherasync")
    @scope("prototype")
    public joblauncher joblauncherasync(datasource datasource, jobrepository jobrepository) throws exception {

        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
    
    @bean
    public jpatransactionmanager transactionmanager(entitymanagerfactory entitymanagerfactory) {
        return new jpatransactionmanager(entitymanagerfactory);
    }
    
    @bean
    public batchjobexecutionlistener batchjobexecutionlistener() {
        return new batchjobexecutionlistener();
    } 
    
    @bean
    public batchjobstepexecutionlistner batchjobstepexecutionlistner() {
        return new batchjobstepexecutionlistner();
    }

}

employeejobconfig.java

#
@configuration
@import(batchconfig.class)
public class employeejobconfig{
    
        
    @autowired
    employeestatuswritter employeestatuswritter;
    
    @autowired
    employeependingprocessor employeependingprocessor;
    
    
    
    @bean("employeependingreader")
    public jpapagingitemreader<employee> employeependingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<employee> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select e from employee e");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("employeespinvokestep")
    public step employeespinvokestep(@qualifier("employeependingreader") itemreader<employee> reader,
            @qualifier("employeestatuswritter") itemwriter<employee> writer,
            @qualifier("employeependingprocessor") itemprocessor<employee, employee> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("employeespinvokestep",jobrepository)
                .<employee, employee>chunk(50,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("employeespjob")
    public job employeespjob(@qualifier("employeespinvokestep") step employeespinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("employeespjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(employeespinvokestep)
                .build();
    }

}

managerconfig.java

@configuration
@import(batchconfig.class)
public class managerconfig {
    
        

    @autowired
    managerstatuswritter managerstatuswritter;
    
    
    @autowired
    managerpendingprocessor managerpendingprocessor;
    
    @bean("managerpendingreader")
    public jpapagingitemreader<manager> managerpendingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<manager> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select m from manager m");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("managerspinvokestep")
    public step indvinvoiceconsctlspinvokestep(@qualifier("managerpendingreader") itemreader<manager> reader,
            @qualifier("managerstatuswritter") itemwriter<manager> writer,
            @qualifier("managerpendingprocessor") itemprocessor<manager, manager> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("managerspinvokestep",jobrepository)
                .<manager, manager>chunk(5000,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("managerspjob")
    public job managerspjob(@qualifier("managerspinvokestep") step indvinvoiceconsctlspinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("managerspjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(indvinvoiceconsctlspinvokestep)
                .build();
    }
    
}

batchjobmanager.java

#
@service
public class batchjobmanager {

    
    
      @autowired applicationcontext context;
     
      @autowired batchexecutorservice batchexecutorservice;
      
      @autowired
      batchjobrunner batchjobrunner;
      
       
    
    public void startjob() {
        
         try {
               system.out.println("batchjobmanager called .. "+new date());
                
               string[] invoicenames={"employeespjob","managerspjob"};
               
               list<string> invoicenameslist = arrays.aslist(invoicenames);
               launchasyn(getbatchjoblist(invoicenameslist));
                
                 
         } catch (exception e) {
             system.out.println("while loading job..");
             e.printstacktrace();
         }
        
    }
      

    public list<batchjob>  getbatchjoblist(list<string> jobnames) throws exception{
        list<batchjob> batchjoblist=new arraylist<batchjob>();
        for(string job:jobnames) {
             batchjob batchjob= batchjob.builder().jobname(invoicejob).build();
             batchjoblist.add(batchjob);
        }
        return batchjoblist;
    }
      
    
       
    public void launchasyn( list<batchjob> batchjoblist) throws exception{
         list<completablefuture<batchjob>> batchjobfuturelist = new arraylist<completablefuture<batchjob>>();
          
          for(batchjob batchjob:batchjoblist) {
          completablefuture<batchjob> jobfuture = batchexecutorservice.execute(batchjob, asynctaskexecutor);
          batchjobfuturelist.add(jobfuture);
          }
          
          completablefuture<void> jobfutureresult = completablefuture
                    .allof(batchjobfuturelist.toarray(new completablefuture[batchjobfuturelist.size()]));
          
          
          completablefuture<list<canbatchjob>> allcompletablefuture = jobfutureresult.thenapply(future -> {
                return batchjobfuturelist.stream().map(completablefuture -> completablefuture.join())
                        .collect(collectors.tolist());
            });
        
          list<batchjob> resultfuturelist=allcompletablefuture.get();
          
          for(batchjob batch:resultfuturelist) {
              system.out.println("status "+batch.getiscompleted());
          }
    }
   
    
}

batchexecutorservice.java

@service
public class batchexecutorservice {
    
        
    @autowired 
    batchjobrunner batchjobrunner;
        
    public completablefuture<canbatchjob> execute(canbatchjob canbatchjob,taskexecutor threadpooltaskexecutor){
        return completablefuture.supplyasync(() -> batchjobrunner.execute(canbatchjob),threadpooltaskexecutor);
    }
    
    
}

batchjobrunner.java

#
@service
public class batchjobrunner {

    @autowired applicationcontext context;
      
    
     @autowired 
     @qualifier("joblauncherasync")
     joblauncher joblauncherasync;
    
    
    /*
     * @autowired joblauncher joblauncherasync;
     */
    
    public batchjob execute(batchjob batchjob) {
        try {
            system.out.println(" batchjob"+batchjob.getjobname()+" called ...");
            joblauncherasync.run(getjob(batchjob.getjobname()), getjobparameters(batchjob.getjobname()));
            thread.sleep(15000);
            batchjob.setiscompleted(true);
            system.out.println(" batchjob"+batchjob.getjobname()+" completed ...");
        }
        catch(exception e) {
            system.out.println("exception "+e.getmessage());
            batchjob.seterrordesc(e.getmessage().tostring());
            e.printstacktrace();
        }
        
        return canbatchjob;
    }
    
    public job getjob(string jobname) {
        
         return  (job) context.getbean(jobname); 
    }
    
    public jobparameters getjobparameters(string jobname) {
    
    jobparameters  jobparameters = new jobparametersbuilder() .addstring("unique_id",
              uuid.randomuuid().tostring(), true) .addstring("job_name", jobname,
              true) .adddate("execution_start_date", date.from(instant.now()),
              true).tojobparameters();
    return jobparameters;
    }
    
}

batchjob.java

public class BatchJob {

    private String jobName;
    private Boolean isCompleted;
    private String errorDesc;
    
    
}

作業在逐一或依序執行時成功執行。但是,在使用 completablefuture 時,遇到了問題。同時啟動 spring 批次作業是正確的方法嗎?

解決方法

1.將下列屬性新增至application.properties

#spring.main.allow-bean-definition-overriding=true

2.更新了 joblauncher(),如下 batchconfig.java 所示

@bean("joblauncher")
    public joblauncher joblauncher(datasource datasource, jobrepository jobrepository) throws exception {
        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
  1. 刪除了 completablefuture。

  2. 刪除了 batchexecutorservice.java

  3. 在 batchjobmanager.java 中新增了以下方法來呼叫作業。

    public void launchSync(List<BatchJob> batchJobList) throws Exception {
             for(BatchJob batchJob:batchJobList) {
                  batchJobRunner.execute(batchJob);
                  }  
         }

以上是嘗試同時啟動兩個或多個 Spring Batch 作業時,會拋出錯誤:ORA-08177: 無法序列化此事務的訪問的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除

相關文章

看更多