在您的個人或專業專案中,您有時會處理大量資料。資料批次是處理大量資料的有效方法,其中資料被收集、處理,然後產生批次結果。批次處理可以應用於許多用例。批次的一個常見用例是將大量 CSV 或 JSON 檔案轉換為可供進一步處理的結構化格式。
在本教程中,我們將嘗試了解如何使用 Spring Boot 來設定此架構,Spring Boot 是一個促進基於 Spring 的應用程式開發的框架。
Spring Batch 是一個用於批次的開源框架。它是一個輕量級的綜合解決方案,旨在支援現代企業系統中常見的強大批次應用程式的開發。它的發展是 SpringSource 和 Accenture 合作的結果。
它可以克服批量開發過程中反覆出現的問題:
注意:在 IT 中,批次是一個獨立運行的程序,對大量資料執行一組處理操作。
為了管理大量數據,我們主要使用以下三個工具:
JobLauncher:這是負責啟動/啟動批次程序的元件。它可以配置為自行觸發或由外部事件觸發(手動啟動)。在 Spring Batch 工作流程中,JobLauncher 負責執行 Job。
作業:這是代表任務的元件,該任務被委派負責程序中解決的業務需求。它負責順序啟動一個或多個步驟。
步驟:這是包含需要解決的業務核心的元件。它負責定義結構如下的三個子組件:
ItemReader:這是負責讀取要處理的輸入資料的元件。它們可以來自各種來源(資料庫、平面檔案(csv、xml、xls 等)、佇列);
ItemProcessor:這是負責轉換讀取的資料的元件。所有管理規則都在其中實施。
ItemWriter:此元件將處理器轉換後的資料保存在一個或多個所需容器(資料庫、平面檔案(csv、xml、xls 等)、雲端)中。
JobRepository:這是負責記錄每次執行時對 JobLauncher、Job 和 Step(s) 的監視統計資料的元件。它提供了兩種可能的技術來儲存這些統計數據:使用資料庫或使用映射。當統計資訊儲存在資料庫中並因此以持久的方式保留時,這允許隨著時間的推移連續監控批次,以便分析發生故障時可能出現的問題。相反,當它在 Map 中時,持久化的統計資料將在每個 Batch 執行實例結束時遺失。在所有情況下,都必須配置其中之一。
欲了解更多信息,我建議您查閱 Spring 網站。
在簡要解釋了 Spring Batch 架構之後,現在讓我們嘗試展示如何設定 Spring Batch 作業,該作業將從 CSV 檔案中讀取數據,隨後將其插入資料庫中。 」讓我們開始編碼」。
產生 Spring Boot 專案最簡單的方法是使用 Spring Boot Tool,步驟如下:
專案產生後,您必須將其解壓縮然後將其匯入到您的 IDE 中。
使用的技術:
所有專案相依性都在 pom.xml 檔案中。 POM 三個字母是項目物件模型的縮寫。它的 XML 表示形式由 Maven 轉換為表示專案模型的資料結構。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
項目架構如下:
要啟用批次處理,我們需要在配置類別上新增@EnableBatchProcessing註解。然後,我們必須建立一個讀取器來讀取 CSV 文件,並建立一個處理器來在寫入之前處理輸入數據,建立一個寫入器來寫入資料庫。
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import com.pathus90.springbatchexample.batch.StudentProcessor; import com.pathus90.springbatchexample.batch.StudentWriter; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.model.StudentFieldSetMapper; @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfig { private static final String FILE_NAME = "results.csv"; private static final String JOB_NAME = "listStudentsJob"; private static final String STEP_NAME = "processingStep"; private static final String READER_NAME = "studentItemReader"; @Value("${header.names}") private String names; @Value("${line.delimiter}") private String delimiter; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); } @Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } @Bean public ItemProcessor<Student, Student> studentItemProcessor() { return new StudentProcessor(); } @Bean public ItemWriter<Student> studentItemWriter() { return new StudentWriter(); } }
第一個方法定義作業,第二個方法定義單一步驟。作業是透過步驟建立的,其中每個步驟都可以涉及讀取器、處理器和寫入器。 在步驟定義中,我們定義一次寫入的資料量,在我們的範例中,一次最多寫入 5 筆記錄。然後,我們使用先前註入的 beans 來配置讀取器、處理器和寫入器。在定義我們的工作時,它將能夠透過精確的順序定義我們執行中的不同步驟。步驟 StudentStep 將由作業 listStudentsJob 執行。
@Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); }
在我們的批次配置中,Reader 讀取資料來源並在一個步驟中連續呼叫並傳回為其定義的物件(在我們的範例中為學生)。
@Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; }
FlatFileItemReader 類別使用 DefaultLineMapper 類,而後者又使用 DelimitedLineTokenizer 類別。 DelimitedLineTokenizer 的作用是將每一行分解為 FieldSet 對象,names 屬性給出檔案頭的格式並允許識別每一行的資料。資料轉換實作類別使用此名稱屬性透過 FieldSet 物件將其轉換為業務物件。這是由 fieldSetMapper (StudentFieldSetMapper) 屬性指示的類別。
import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; public class StudentFieldSetMapper implements FieldSetMapper<Student> { @Override public Student mapFieldSet(FieldSet fieldSet) { return Student.builder() .rank(fieldSet.readString(0)) .firstName(fieldSet.readString(1)) .lastName(fieldSet.readString(2)) .center(fieldSet.readString(3)) .pv(fieldSet.readString(4)) .origin(fieldSet.readString(5)) .mention(fieldSet.readString(6)) .build(); } }
LineMapper 介面用於將行(字串)對應到通常用於映射從檔案讀取的行的物件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
與 Reader 不同,Processor 的實作更多的是為了功能需求。它不是強制性的,如果在我們的處理中沒有預見到功能需要,我們可以不使用它。在我們的範例中,我們編寫了一個簡單的處理器,它僅將學生物件的一些屬性轉換為大寫,我們可以使用以下程式碼超越此範例。更具體的功能案例。
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import com.pathus90.springbatchexample.batch.StudentProcessor; import com.pathus90.springbatchexample.batch.StudentWriter; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.model.StudentFieldSetMapper; @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfig { private static final String FILE_NAME = "results.csv"; private static final String JOB_NAME = "listStudentsJob"; private static final String STEP_NAME = "processingStep"; private static final String READER_NAME = "studentItemReader"; @Value("${header.names}") private String names; @Value("${line.delimiter}") private String delimiter; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); } @Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } @Bean public ItemProcessor<Student, Student> studentItemProcessor() { return new StudentProcessor(); } @Bean public ItemWriter<Student> studentItemWriter() { return new StudentWriter(); } }
寫入器寫入來自處理器的資料(或直接由讀取器讀取)。在我們的例子中,它從處理器接收轉換後的對象,每個對象隨後將保存在我們的資料庫中,並且交易將被驗證。
@Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); }
@Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; }
import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; public class StudentFieldSetMapper implements FieldSetMapper<Student> { @Override public Student mapFieldSet(FieldSet fieldSet) { return Student.builder() .rank(fieldSet.readString(0)) .firstName(fieldSet.readString(1)) .lastName(fieldSet.readString(2)) .center(fieldSet.readString(3)) .pv(fieldSet.readString(4)) .origin(fieldSet.readString(5)) .mention(fieldSet.readString(6)) .build(); } }
完成批次配置設定後,讓我們看看上面所說的一切是否有效
要運行應用程序,您需要查找包含註釋@SpringBootApplication的文件,這是我們應用程式的主要部分。
@Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; }
啟動上面的 main 將開始我們的工作,批次啟動器如下所示:
import org.springframework.batch.item.ItemProcessor; import com.pathus90.springbatchexample.model.Student; public class StudentProcessor implements ItemProcessor<Student, Student> { @Override public Student process(Student student) { student.setFirstName(student.getFirstName().toUpperCase()); student.setLastName(student.getLastName().toUpperCase()); student.setCenter(student.getCenter().toUpperCase()); student.setOrigin(student.getOrigin().toUpperCase()); student.setMention(student.getMention().toUpperCase()); return student; } }
已設定排程器以允許自動觸發批次。在此範例中,批次啟動後將每 8 秒執行一次。您可以透過更改 fixedDelay 值(以毫秒為單位)來使用它。
import java.util.List; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.service.IStudentService; import lombok.extern.slf4j.Slf4j; @Slf4j public class StudentWriter implements ItemWriter<Student> { @Autowired private IStudentService studentService; @Override public void write(List<? extends Student> students) { students.stream().forEach(student -> { log.info("Enregistrement en base de l'objet {}", student); studentService.insertStudent(student); }); } }
除了執行上面的主檔案來啟動批次之外,您還可以在使用命令提示字元時執行命令 mvn spring-boot:run。
您也可以使用 JAR 存檔檔案啟動應用程序,在這種情況下,您必須:
使用命令提示字元前往專案的父資料夾並執行命令 mvn clean package 這將打包我們的專案。
在目標資料夾中,將會建立一個 jar 檔案。
要執行應用程序,請使用指令java -jar target/ generated_file_name-0.0.1-SNAPSHOT.jar
也要確保在啟動我們的 Spring Batch 應用程式時 H2控制台已經啟動,並且自動產生資料庫以及建立表 Student .
我們可以清楚地看到我們的文件已經很好地整合到我們的資料庫中。
注意:如果我們還想手動啟動批次而不傳遞將根據我們的設定觸發的調度程序,我已經公開了一個使用控制器來調用的API春季工作批次。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
只要啟動 URL:http://localhost:8080/load,批次就會啟動
我們第一次學習使用 Spring 框架的批次程式設計已經結束了。如果您有任何意見或問題,請留言!
祝大家學習愉快,希望第一篇教學對您有幫助。
您可以在這裡找到可用的原始碼
以上是使用 SPRING BATCH 開始編程的詳細內容。更多資訊請關注PHP中文網其他相關文章!