>  기사  >  Java  >  Java 다중 스레드 중단점 복사 방법은 무엇입니까?

Java 다중 스레드 중단점 복사 방법은 무엇입니까?

WBOY
WBOY앞으로
2023-04-13 17:31:19666검색

Details

Timer 클래스(java.util.Timer)를 사용하여 중단점 기능을 구현합니다. 기록된 내용은 각 스레드의 복사본 진행 상황을 기록하는 데 사용됩니다.

Timer 클래스 소개:

백그라운드 스레드에서 향후 실행을 위해 작업을 예약하는 기능입니다. 작업은 일회성 실행으로 예약되거나 정기적인 간격으로 반복 실행되도록 예약될 수 있습니다. 향후 실행을 위한 작업. 작업은 한 번 실행되거나 주기적으로 반복되도록 예약할 수 있습니다.

API 소개에 따르면 이 Timer 클래스는 작업을 한 번만 수행할 수도 있고, 주기적으로 작업을 수행할 수도 있음을 알 수 있습니다. (이 클래스는 javax 패키지의 클래스가 아닌 java.util.Timer 클래스입니다.)

이 클래스에는 여기에서 소개하지 않은 많은 시간 관련 메소드가 있습니다. 여기서만 소개하겠습니다. 방법을 사용해야 합니다.

public void schedule(TimerTask task, long delay, long period)

지정된 지연 이후에 시작되는 반복 고정 지연 실행을 위해 지정된 작업을 예약합니다.

지정된 지연 이후에 시작되는 반복 고정 지연 실행을 위해 지정된 작업을 예약합니다. . 후속 실행은 지정된 간격으로 대략적인 간격으로 발생합니다.

이 방법을 사용하면 각 스레드의 복사 진행 정보를 고정된 시간 간격으로 기록할 수 있습니다.

코드 부분

스케줄 태스크 클래스

package dragon.local;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class RecordTask extends TimerTask {
public static final String filename = "breakPointRecord.txt";
private Timer timer;
private List<FileCopyThread> copyThreads;
private String outputPath;

public RecordTask(Timer timer, List<FileCopyThread> copyThreads, String outputPath) {
this.timer = timer;
this.copyThreads = copyThreads;
this.outputPath = outputPath;
}

@Override
public void run() {
try {
this.breakPointRecord();
} catch (IOException e) {
e.printStackTrace();
}
}

public void breakPointRecord() throws FileNotFoundException, IOException {
int aliveThreadNum = 0;  //存活线程数目
//不使用追加方式,这里只需要最新的记录即可。
File recordFile = new File(outputPath, filename);
try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(recordFile))){
//每次记录一个线程的下载位置,但是取出来又需要进行转换,太麻烦了。
//我们直接使用序列化来进行操作,哈哈!
long[] curlen = new long[4];
int index = 0;
for (FileCopyThread copyThread : copyThreads) {
if (copyThread.isAlive()) {
aliveThreadNum++;
}
curlen[index++] = copyThread.getCurlen();
System.out.println(index+" curlen: "+copyThread.getCurlen());
}
//创建 Record 对象,并序列化。
oos.writeObject(new Record(curlen));
}
//当所有的线程都死亡时,关闭计时器,删除记录文件。(所有线程死亡的话,就是文件已经复制完成了!)
if (aliveThreadNum == 0) {
timer.cancel();
recordFile.delete();
}
System.out.println("线程数量: "+aliveThreadNum);
}
}

설명:

if (aliveThreadNum == 0) {
timer.cancel();
recordFile.delete();
}

스레드가 종료되면 프로그램이 정상적으로 종료되었음을 의미합니다.

이때 녹화파일을 삭제하세요. 여기서 기록 파일은 플래그입니다. 기록 파일이 존재한다면 프로그램이 정상적으로 종료되지 않았다는 뜻입니다. 다시 시작하면 중단점 복사가 수행됩니다.

참고: 복사 프로세스 중 IO 예외는 여기에서 고려되지 않습니다. 스레드가 IO 예외를 발생시키면 스레드 상태도 종료됩니다. 하지만 로컬 파일 복사에서는 아직 IO 예외가 상대적으로 적다는 점을 고려하면 인터넷을 통해 다운로드할 경우 이 프로그램의 기능을 개선해야 할 수도 있다는 점은 고려하지 않았습니다.

기록 정보 클래스

각 스레드의 정보를 매번 순서대로 작성해야 하는데, 읽은 후 변환해야 하는데 여전히 너무 번거롭게 느껴집니다. 여기서는 Java의 직렬화 메커니즘을 직접 사용합니다. 때로는 개체를 직접 조작하는 것이 편리할 때도 있습니다. 참고: 배열의 첨자는 각 스레드의 위치를 ​​나타냅니다.

package dragon.local;

import java.io.Serializable;

public class Record implements Serializable{
/**
 * 序列化 id
 */
private static final long serialVersionUID = 1L;
private long[] curlen;

public Record(long[] curlen) {
this.curlen = curlen;
} 

public long[] getCurlen() {
return this.curlen;
}
}

스레드 클래스 복사

package dragon.local;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileCopyThread extends Thread {
private int index;
private long position;
private long size;
private File targetFile;
private File outputFile;
private long curlen;      //当前下载的长度

public FileCopyThread(int index, long position, long size, File targetFile, File outputFile) {
this.index = index;
this.position = position;
this.size = size;
this.targetFile = targetFile;
this.outputFile = outputFile;
this.curlen = 0L;
}

@Override
public void run() {
try (
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(targetFile));
RandomAccessFile raf = new RandomAccessFile(outputFile, "rw")){
bis.skip(position);  //跳过不需要读取的字节数,注意只能先后跳
raf.seek(position);  //跳到需要写入的位置,没有这句话,会出错,但是很难改。
int hasRead = 0;
byte[] b = new byte[1024];
/**
 * 注意,每个线程只是读取一部分数据,不能只以 -1 作为循环结束的条件
 * 循环退出条件应该是两个,即写入的字节数大于需要读取的字节数 或者 文件读取结束(最后一个线程读取到文件末尾)
 */
while(curlen < size && (hasRead = bis.read(b)) != -1) {
raf.write(b, 0, hasRead);
curlen += (long)hasRead;
//强制停止程序。
//if (curlen > 17_000_000) {
//System.exit(0);
//}
}

System.out.println(index+" "+position+" "+curlen+" "+size);
} catch (IOException e) {
e.printStackTrace();
}
}

public long getCurlen() {   //获取当前的进度,用于记录,以便必要时恢复读取进度。
return position+this.curlen;
}
}

이 코드는 중단점 테스트를 위해 복사되었습니다. 테스트하고 싶다면 복사하려는 파일의 크기에 따라 if 판단의 조건을 적절하게 조정할 수 있습니다. 테스트하려면 먼저 이 코드의 주석을 해제한 다음 프로그램을 실행하고(그러면 프로그램이 종료되며 이때 파일은 복사되지 않습니다.) 이 코드에 주석을 달고 프로그램을 다시 실행하면 파일이 다음과 같이 됩니다. 성공적으로 복사되었습니다.

//强制停止程序。
//if (curlen > 17_000_000) {
//System.exit(0);
//}

복사 도구 클래스

package dragon.local;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;



/**
 * 设计思路:
 * 获取目标文件的大小,然后设置复制文件的大小(这样做是有好处的),
 * 然后使用将文件分为 n 分,使用 n 个线程同时进行复制(这里我将 n 取为 4)。
 * 
 * 进一步拓展:
 * 加强为断点复制功能,即程序中断以后,
 * 仍然可以继续从上次位置恢复复制,减少不必要的重复开销
 * */

public class FileCopyUtil {
//设置一个常量,复制线程的数量
private static final int THREAD_NUM = 4;

private FileCopyUtil() {}

/**
 * @param targetPath 目标文件的路径
 * @param outputPath 复制输出文件的路径
 * @throws IOException 
 * @throws ClassNotFoundException 
 * */
public static void transferFile(String targetPath, String outputPath) throws IOException, ClassNotFoundException {
File targetFile = new File(targetPath);
File outputFilePath = new File(outputPath);
if (!targetFile.exists() || targetFile.isDirectory()) {   //目标文件不存在,或者是一个文件夹,则抛出异常
throw new FileNotFoundException("目标文件不存在:"+targetPath);
}
if (!outputFilePath.exists()) {     //如果输出文件夹不存在,将会尝试创建,创建失败,则抛出异常。
if(!outputFilePath.mkdir()) {
throw new FileNotFoundException("无法创建输出文件:"+outputPath);
}
}

long len = targetFile.length();

File outputFile = new File(outputFilePath, "copy"+targetFile.getName());
createOutputFile(outputFile, len);    //创建输出文件,设置好大小。

//创建计时器 Timer 对象
Timer timer = new Timer();

long[] position = new long[4];
//每一个线程需要复制文件的起点
long size = len / FileCopyUtil.THREAD_NUM + 1;     //保存复制线程的集合
List<FileCopyThread> copyThreads = new ArrayList<>();
Record record = getRecord(outputPath);

for (int i = 0; i < FileCopyUtil.THREAD_NUM; i++) {
//如果已经有了 记录文件,就从使用记录数据,否则就是新的下载。
position[i] = record == null ? i*size : record.getCurlen()[i];
FileCopyThread copyThread = new FileCopyThread(i, position[i], size, targetFile, outputFile);
copyThread.start();     //启动复制线程
copyThreads.add(copyThread);   //将复制线程添加到集合中。
}

timer.schedule(new RecordTask(timer, copyThreads, outputPath), 0L, 100L);  //立即启动计时器,每隔10秒记录一次位置。
System.out.println("开始了!");
}

//创建输出文件,设置好大小。
private static void createOutputFile(File file, long length) throws IOException {
try (   
RandomAccessFile raf = new RandomAccessFile(file, "rw")){
raf.setLength(length);
}
}

//获取以及下载的位置
private static Record getRecord(String outputPath) throws FileNotFoundException, IOException, ClassNotFoundException {
File recordFile = new File(outputPath, RecordTask.filename);
if (recordFile.exists()) {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(recordFile))){
return (Record) ois.readObject();
}
}
return null;
}
}

지침: 복사된 디렉터리에 레코드 파일이 있는지 여부에 따라 중단점 복사를 시작할지 여부를 결정합니다.

private static Record getRecord(String outputPath) throws FileNotFoundException, IOException, ClassNotFoundException {
File recordFile = new File(outputPath, RecordTask.filename);
if (recordFile.exists()) {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(recordFile))){
return (Record) ois.readObject();
}
}
return null;
}

복사 시작 위치가 기록 위치가 된다는 점만 빼면 복사와 동일합니다.

아아아아

위 내용은 Java 다중 스레드 중단점 복사 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제