ホームページ >Java >&#&チュートリアル >Java APIを使用してHDFSを操作するにはどうすればよいですか?

Java APIを使用してHDFSを操作するにはどうすればよいですか?

王林
王林転載
2023-04-19 14:28:131323ブラウズ

1. 現在のディレクトリ内のすべてのファイルとフォルダーをスキャンする

listStatus メソッドを使用すると、上記の要件を達成できます。
listStatus メソッドのシグネチャは次のとおりです。

  /**
   * List the statuses of the files/directories in the given path if the path is
   * a directory.
   * 
   * @param f given path
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist;
   *         IOException see specific implementation
   */
  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
                                                         IOException;

listStatus はパラメータ Path を渡すだけでよく、FileStatus の配列が返されることがわかります。
FileStatus には次の情報が含まれています

/** Interface that represents the client side information for a file.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileStatus implements Writable, Comparable {

  private Path path;
  private long length;
  private boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;
  private Path symlink;
  ....

FileStatus から、ファイル パス、サイズ、ディレクトリかどうか、block_replication、blocksize などの情報を確認することは難しくありません。

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HdfsOperation {
	
	val logger = LoggerFactory.getLogger(this.getClass)
	
	def tree(sc: SparkContext, path: String) : Unit = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val fsPath = new Path(path)
		val status = fs.listStatus(fsPath)
		for(filestatus:FileStatus <- status) {
			logger.error("getPermission is: {}", filestatus.getPermission)
			logger.error("getOwner is: {}", filestatus.getOwner)
			logger.error("getGroup is: {}", filestatus.getGroup)
			logger.error("getLen is: {}", filestatus.getLen)
			logger.error("getModificationTime is: {}", filestatus.getModificationTime)
			logger.error("getReplication is: {}", filestatus.getReplication)
			logger.error("getBlockSize is: {}", filestatus.getBlockSize)
			if (filestatus.isDirectory) {
				val dirpath = filestatus.getPath.toString
				logger.error("文件夹名字为: {}", dirpath)
				tree(sc, dirpath)
			} else {
				val fullname = filestatus.getPath.toString
				val filename = filestatus.getPath.getName
				logger.error("全部文件名为: {}", fullname)
				logger.error("文件名为: {}", filename)
			}
		}
	}
}

fileStatus がフォルダーであると判断された場合は、すべてを走査するという目的を達成するために、ツリー メソッドが再帰的に呼び出されます。

2. すべてのファイルを走査する

上記の方法は、すべてのファイルとフォルダーを走査することです。ファイルを反復処理するだけの場合は、listFiles メソッドを使用できます。

	def findFiles(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val fsPath = new Path(path)
		val files = fs.listFiles(fsPath, true)
		while(files.hasNext) {
			val filestatus = files.next()
			val fullname = filestatus.getPath.toString
			val filename = filestatus.getPath.getName
			logger.error("全部文件名为: {}", fullname)
			logger.error("文件名为: {}", filename)
			logger.error("文件大小为: {}", filestatus.getLen)
		}
	}
  /**
   * List the statuses and block locations of the files in the given path.
   * 
   * If the path is a directory, 
   *   if recursive is false, returns files in the directory;
   *   if recursive is true, return files in the subtree rooted at the path.
   * If the path is a file, return the file&#39;s status and block locations.
   * 
   * @param f is the path
   * @param recursive if the subdirectories need to be traversed recursively
   *
   * @return an iterator that traverses statuses of the files
   *
   * @throws FileNotFoundException when the path does not exist;
   *         IOException see specific implementation
   */
  public RemoteIterator<LocatedFileStatus> listFiles(
      final Path f, final boolean recursive)
  throws FileNotFoundException, IOException {
  ...

ソース コードからわかるように、listFiles は反復可能なオブジェクト RemoteIteratorc9e85c60fcfb1f1d8c4d5e1d7b95ff0b を返し、listStatus は配列を返します。同時に、listFiles はすべてのファイルを返します。

3.フォルダーの作成

	def mkdirToHdfs(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val result = fs.mkdirs(new Path(path))
		if (result) {
			logger.error("mkdirs already success!")
		} else {
			logger.error("mkdirs had failed!")
		}
	}

4.フォルダーの削除

	def deleteOnHdfs(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val result = fs.delete(new Path(path), true)
		if (result) {
			logger.error("delete already success!")
		} else {
			logger.error("delete had failed!")
		}
	}

5.ファイルのアップロード

	def uploadToHdfs(sc: SparkContext, localPath: String, hdfsPath: String): Unit = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath))
		fs.close()
	}

6.ファイルのダウンロード

	def downloadFromHdfs(sc: SparkContext, localPath: String, hdfsPath: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath))
		fs.close()
	}

以上がJava APIを使用してHDFSを操作するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。