Maison  >  Article  >  Lorsque vous essayez de démarrer deux tâches Spring Batch ou plus en même temps, une erreur est générée : ORA-08177 : Impossible de sérialiser l'accès pour cette transaction

Lorsque vous essayez de démarrer deux tâches Spring Batch ou plus en même temps, une erreur est générée : ORA-08177 : Impossible de sérialiser l'accès pour cette transaction

王林
王林avant
2024-02-09 17:00:10580parcourir

L'éditeur php Xigua peut rencontrer un problème lors de l'utilisation de Spring Batch : lorsque vous essayez de démarrer deux ou plusieurs tâches Spring Batch en même temps, une erreur sera générée : ORA-08177 : Impossible de sérialiser l'accès à cette transaction. Cette erreur peut prêter à confusion, mais elle est en réalité causée par un problème de verrouillage de la base de données. Avant de résoudre ce problème, nous devons comprendre certaines connaissances de base sur Spring Batch et les transactions de base de données.

Contenu de la question

J'ai rencontré une erreur en essayant d'utiliser completablefuture pour exécuter deux tâches Spring Batch en parallèle. Le message d'erreur est le suivant :

originalsql = insert into batch_job_instance(job_instance_id, job_name, job_key, version)
    values (?, ?, ?, ?)
, error msg = ora-08177: can't serialize access for this transaction

J'utilise Spring Batch 5.x, Spring Boot 3.1.6, JDK 17

Propriétés de l'application

spring.batch.repository.isolationlevelforcreate=isolation_read_committed
spring.batch.isolationlevel=read_committed
spring.batch.jdbc.table-prefix=batch_
spring.batch.job.enabled=false

batchconfig.java

@configuration
public class batchconfig {

@bean("simpletaskexecutor")
    public taskexecutor simpletaskexecutor() {
        simpleasynctaskexecutor asynctaskexecutor = new simpleasynctaskexecutor("simpletaskexecutor");
        asynctaskexecutor.setconcurrencylimit(concurrencycount);
        return asynctaskexecutor;
    }


@bean("joblauncherasync")
    @scope("prototype")
    public joblauncher joblauncherasync(datasource datasource, jobrepository jobrepository) throws exception {

        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
    
    @bean
    public jpatransactionmanager transactionmanager(entitymanagerfactory entitymanagerfactory) {
        return new jpatransactionmanager(entitymanagerfactory);
    }
    
    @bean
    public batchjobexecutionlistener batchjobexecutionlistener() {
        return new batchjobexecutionlistener();
    } 
    
    @bean
    public batchjobstepexecutionlistner batchjobstepexecutionlistner() {
        return new batchjobstepexecutionlistner();
    }

}

employeejobconfig.java

@configuration
@import(batchconfig.class)
public class employeejobconfig{
    
        
    @autowired
    employeestatuswritter employeestatuswritter;
    
    @autowired
    employeependingprocessor employeependingprocessor;
    
    
    
    @bean("employeependingreader")
    public jpapagingitemreader<employee> employeependingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<employee> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select e from employee e");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("employeespinvokestep")
    public step employeespinvokestep(@qualifier("employeependingreader") itemreader<employee> reader,
            @qualifier("employeestatuswritter") itemwriter<employee> writer,
            @qualifier("employeependingprocessor") itemprocessor<employee, employee> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("employeespinvokestep",jobrepository)
                .<employee, employee>chunk(50,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("employeespjob")
    public job employeespjob(@qualifier("employeespinvokestep") step employeespinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("employeespjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(employeespinvokestep)
                .build();
    }

}

managerconfig.java

@configuration
@import(batchconfig.class)
public class managerconfig {
    
        

    @autowired
    managerstatuswritter managerstatuswritter;
    
    
    @autowired
    managerpendingprocessor managerpendingprocessor;
    
    @bean("managerpendingreader")
    public jpapagingitemreader<manager> managerpendingreader(datasource ds,entitymanagerfactory entitymanagerfactory) {
        
        jpapagingitemreader<manager> jpareader = new jpapagingitemreader<>();
        jpareader.setentitymanagerfactory(entitymanagerfactory);
        jpareader.setquerystring("select m from manager m");
        jpareader.setpagesize(5000);
        return jpareader;

    }
    
    @bean("managerspinvokestep")
    public step indvinvoiceconsctlspinvokestep(@qualifier("managerpendingreader") itemreader<manager> reader,
            @qualifier("managerstatuswritter") itemwriter<manager> writer,
            @qualifier("managerpendingprocessor") itemprocessor<manager, manager> processor,jobrepository jobrepository,
            platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) {

        return new stepbuilder("managerspinvokestep",jobrepository)
                .<manager, manager>chunk(5000,transactionmanager).reader(reader)
                .processor(processor).writer(writer)
                .listener(batchjobstepexecutionlistner)
                .taskexecutor(simpletaskexecutor)
                .build();
    }
    
    @bean("managerspjob")
    public job managerspjob(@qualifier("managerspinvokestep") step indvinvoiceconsctlspinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) {
        return new jobbuilder("managerspjob",jobrepository)
                .incrementer(new runidincrementer())
                .listener(batchjobexecutionlistener)
                .start(indvinvoiceconsctlspinvokestep)
                .build();
    }
    
}

batchjobmanager.java

@service
public class batchjobmanager {

    
    
      @autowired applicationcontext context;
     
      @autowired batchexecutorservice batchexecutorservice;
      
      @autowired
      batchjobrunner batchjobrunner;
      
       
    
    public void startjob() {
        
         try {
               system.out.println("batchjobmanager called .. "+new date());
                
               string[] invoicenames={"employeespjob","managerspjob"};
               
               list<string> invoicenameslist = arrays.aslist(invoicenames);
               launchasyn(getbatchjoblist(invoicenameslist));
                
                 
         } catch (exception e) {
             system.out.println("while loading job..");
             e.printstacktrace();
         }
        
    }
      

    public list<batchjob>  getbatchjoblist(list<string> jobnames) throws exception{
        list<batchjob> batchjoblist=new arraylist<batchjob>();
        for(string job:jobnames) {
             batchjob batchjob= batchjob.builder().jobname(invoicejob).build();
             batchjoblist.add(batchjob);
        }
        return batchjoblist;
    }
      
    
       
    public void launchasyn( list<batchjob> batchjoblist) throws exception{
         list<completablefuture<batchjob>> batchjobfuturelist = new arraylist<completablefuture<batchjob>>();
          
          for(batchjob batchjob:batchjoblist) {
          completablefuture<batchjob> jobfuture = batchexecutorservice.execute(batchjob, asynctaskexecutor);
          batchjobfuturelist.add(jobfuture);
          }
          
          completablefuture<void> jobfutureresult = completablefuture
                    .allof(batchjobfuturelist.toarray(new completablefuture[batchjobfuturelist.size()]));
          
          
          completablefuture<list<canbatchjob>> allcompletablefuture = jobfutureresult.thenapply(future -> {
                return batchjobfuturelist.stream().map(completablefuture -> completablefuture.join())
                        .collect(collectors.tolist());
            });
        
          list<batchjob> resultfuturelist=allcompletablefuture.get();
          
          for(batchjob batch:resultfuturelist) {
              system.out.println("status "+batch.getiscompleted());
          }
    }
   
    
}

batchexecutorservice.java

@service
public class batchexecutorservice {
    
        
    @autowired 
    batchjobrunner batchjobrunner;
        
    public completablefuture<canbatchjob> execute(canbatchjob canbatchjob,taskexecutor threadpooltaskexecutor){
        return completablefuture.supplyasync(() -> batchjobrunner.execute(canbatchjob),threadpooltaskexecutor);
    }
    
    
}

batchjobrunner.java

@service
public class batchjobrunner {

    @autowired applicationcontext context;
      
    
     @autowired 
     @qualifier("joblauncherasync")
     joblauncher joblauncherasync;
    
    
    /*
     * @autowired joblauncher joblauncherasync;
     */
    
    public batchjob execute(batchjob batchjob) {
        try {
            system.out.println(" batchjob"+batchjob.getjobname()+" called ...");
            joblauncherasync.run(getjob(batchjob.getjobname()), getjobparameters(batchjob.getjobname()));
            thread.sleep(15000);
            batchjob.setiscompleted(true);
            system.out.println(" batchjob"+batchjob.getjobname()+" completed ...");
        }
        catch(exception e) {
            system.out.println("exception "+e.getmessage());
            batchjob.seterrordesc(e.getmessage().tostring());
            e.printstacktrace();
        }
        
        return canbatchjob;
    }
    
    public job getjob(string jobname) {
        
         return  (job) context.getbean(jobname); 
    }
    
    public jobparameters getjobparameters(string jobname) {
    
    jobparameters  jobparameters = new jobparametersbuilder() .addstring("unique_id",
              uuid.randomuuid().tostring(), true) .addstring("job_name", jobname,
              true) .adddate("execution_start_date", date.from(instant.now()),
              true).tojobparameters();
    return jobparameters;
    }
    
}

batchjob.java

public class BatchJob {

    private String jobName;
    private Boolean isCompleted;
    private String errorDesc;
    
    
}

Les tâches s'exécutent avec succès lorsqu'elles sont exécutées une par une ou dans l'ordre. Cependant, lors de l'utilisation de completeablefuture, j'ai rencontré un problème. Est-ce que démarrer les travaux par lots de printemps en même temps est la bonne manière ?

Solution

1. Ajoutez les propriétés suivantes à application.properties

.

spring.main.allow-bean-definition-overriding=true

2. Joblauncher() mis à jour, comme indiqué dans batchconfig.java ci-dessous

@bean("joblauncher")
    public joblauncher joblauncher(datasource datasource, jobrepository jobrepository) throws exception {
        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(simpletaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }
  1. Suppression de completeablefuture.

  2. batchexecutorservice.java supprimé

  3. Ajout des méthodes suivantes dans batchjobmanager.java pour appeler des tâches.

    public void launchSync(List<BatchJob> batchJobList) throws Exception {
             for(BatchJob batchJob:batchJobList) {
                  batchJobRunner.execute(batchJob);
                  }  
         }

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer