首頁 >Java >java教程 >SpringBoot怎麼整合SFTP客戶端實作檔案上傳下載

SpringBoot怎麼整合SFTP客戶端實作檔案上傳下載

王林
王林轉載
2023-05-16 14:40:061723瀏覽

背景

在專案開發中,一般檔案儲存很少再使用SFTP服務,但是也不排除合作夥伴使用SFTP來儲存專案中的檔案或透過SFTP來實現檔案資料的互動。

我遇到的專案中,就有銀行和保險公司等合作夥伴透過SFTP服務來實現與我們專案的檔案資料的互動。

為了能夠順利地完成與友商的SFTP服務的連通,我們需要在自己的專案中實作一套SFTP客戶端工具。一般我們會採用Jsch來實現SFTP客戶端。

依賴

<!--执行远程操作-->
<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jsch</artifactId>
    <version>0.1.55</version>
</dependency>
     <!--链接池-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
</dependency>

首先我們一定要引入jsch依賴,這個是我們實作SFTP客戶端的基石;其次我們引進了連結池工具,為了避免每次執行SFTP指令都要重新建立鏈接,我們使用池化的方式優化了比較消耗資源的建立操作。

建立工具類別

為了更好的使用SFTP工具,我們把jsch中關於SFTP的相關功能提煉出來,做了一次簡單的封裝,做成了我們可以直接使用的工具類別。

裡面只有兩類方法:

1.創建Session與開啟Session;

session創建好後,還不能創建channel,需要開啟session後才能創建channel;

2.創建channel與開啟channel;

channel也是一樣,創建好的channel需要開啟後才能真正地執行命令;

public class JschUtil {
  /**
   * 创建session
   *
   * @param userName       用户名
   * @param password       密码
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密钥文件
   * @param passphrase     口令
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
    return createSession(new JSch(), userName, password, host, port, privateKeyFile, passphrase);
  }
  /**
   * 创建session
   *
   * @param jSch
   * @param userName       用户名
   * @param password       密码
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密钥
   * @param passphrase     口令
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
    try {
      if (!StringUtils.isEmpty(privateKeyFile)) {
        // 使用密钥验证方式,密钥可以是有口令的密钥,也可以是没有口令的密钥
        if (!StringUtils.isEmpty(passphrase)) {
          jSch.addIdentity(privateKeyFile, passphrase);
        } else {
          jSch.addIdentity(privateKeyFile);
        }
      }
      // 获取session
      Session session = jSch.getSession(userName, host, port);
      if (!StringUtils.isEmpty(password)) {
        session.setPassword(password);
      }
      // 不校验域名
      session.setConfig("StrictHostKeyChecking", "no");
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "create session fail");
    }
  }
  /**
   * 创建session
   *
   * @param jSch
   * @param userName 用户名
   * @param password 密码
   * @param host     域名
   * @param port     端口
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(JSch jSch, String userName, String password, String host, int port) throws AwesomeException {
    return createSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
  }
  /**
   * 创建session
   *
   * @param jSch
   * @param userName 用户名
   * @param host     域名
   * @param port     端口
   * @return
   * @throws AwesomeException
   */
  private Session createSession(JSch jSch, String userName, String host, int port) throws AwesomeException {
    return createSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
  }
  /**
   * 开启session链接
   *
   * @param jSch
   * @param userName       用户名
   * @param password       密码
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密钥
   * @param passphrase     口令
   * @param timeout        链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
    Session session = createSession(jSch, userName, password, host, port, privateKeyFile, passphrase);
    try {
      if (timeout >= 0) {
        session.connect(timeout);
      } else {
        session.connect();
      }
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "session connect fail");
    }
  }
  /**
   * 开启session链接
   *
   * @param userName       用户名
   * @param password       密码
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密钥
   * @param passphrase     口令
   * @param timeout        链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
    Session session = createSession(userName, password, host, port, privateKeyFile, passphrase);
    try {
      if (timeout >= 0) {
        session.connect(timeout);
      } else {
        session.connect();
      }
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "session connect fail");
    }
  }
  /**
   * 开启session链接
   *
   * @param jSch
   * @param userName 用户名
   * @param password 密码
   * @param host     域名
   * @param port     端口
   * @param timeout  链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String password, String host, int port, int timeout) throws AwesomeException {
    return openSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 开启session链接
   *
   * @param userName 用户名
   * @param password 密码
   * @param host     域名
   * @param port     端口
   * @param timeout  链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String password, String host, int port, int timeout) throws AwesomeException {
    return openSession(userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 开启session链接
   *
   * @param jSch
   * @param userName 用户名
   * @param host     域名
   * @param port     端口
   * @param timeout  链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String host, int port, int timeout) throws AwesomeException {
    return openSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 开启session链接
   *
   * @param userName 用户名
   * @param host     域名
   * @param port     端口
   * @param timeout  链接超时时间
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String host, int port, int timeout) throws AwesomeException {
    return openSession(userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 创建指定通道
   *
   * @param session
   * @param channelType
   * @return
   * @throws AwesomeException
   */
  public static Channel createChannel(Session session, ChannelType channelType) throws AwesomeException {
    try {
      if (!session.isConnected()) {
        session.connect();
      }
      return session.openChannel(channelType.getValue());
    } catch (Exception e) {
      throw new AwesomeException(500, "open channel fail");
    }
  }
  /**
   * 创建sftp通道
   *
   * @param session
   * @return
   * @throws AwesomeException
   */
  public static ChannelSftp createSftp(Session session) throws AwesomeException {
    return (ChannelSftp) createChannel(session, ChannelType.SFTP);
  }
  /**
   * 创建shell通道
   *
   * @param session
   * @return
   * @throws AwesomeException
   */
  public static ChannelShell createShell(Session session) throws AwesomeException {
    return (ChannelShell) createChannel(session, ChannelType.SHELL);
  }
  /**
   * 开启通道
   *
   * @param session
   * @param channelType
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static Channel openChannel(Session session, ChannelType channelType, int timeout) throws AwesomeException {
    Channel channel = createChannel(session, channelType);
    try {
      if (timeout >= 0) {
        channel.connect(timeout);
      } else {
        channel.connect();
      }
      return channel;
    } catch (Exception e) {
      throw new AwesomeException(500, "connect channel fail");
    }
  }
  /**
   * 开启sftp通道
   *
   * @param session
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static ChannelSftp openSftpChannel(Session session, int timeout) throws AwesomeException {
    return (ChannelSftp) openChannel(session, ChannelType.SFTP, timeout);
  }
  /**
   * 开启shell通道
   *
   * @param session
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static ChannelShell openShellChannel(Session session, int timeout) throws AwesomeException {
    return (ChannelShell) openChannel(session, ChannelType.SHELL, timeout);
  }
  enum ChannelType {
    SESSION("session"),
    SHELL("shell"),
    EXEC("exec"),
    X11("x11"),
    AGENT_FORWARDING("auth-agent@openssh.com"),
    DIRECT_TCPIP("direct-tcpip"),
    FORWARDED_TCPIP("forwarded-tcpip"),
    SFTP("sftp"),
    SUBSYSTEM("subsystem");
    private final String value;
    ChannelType(String value) {
      this.value = value;
    }
    public String getValue() {
      return this.value;
    }
  }
}

SFTP連結池化

我們透過實作BasePooledObjectFactory類別來池化通道ChannelSftp。這不是真正池化的程式碼,下面的程式碼只是告知池化管理器如何建立物件和銷毀物件。

static class SftpFactory extends BasePooledObjectFactory<ChannelSftp> implements AutoCloseable {
    private Session session;
    private SftpProperties properties;
    // 初始化SftpFactory
    // 里面主要是创建目标session,后续可用通过这个session不断地创建ChannelSftp。
    SftpFactory(SftpProperties properties) throws AwesomeException {
      this.properties = properties;
      String username = properties.getUsername();
      String password = properties.getPassword();
      String host = properties.getHost();
      int port = properties.getPort();
      String privateKeyFile = properties.getPrivateKeyFile();
      String passphrase = properties.getPassphrase();
      session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase);
    }
    // 销毁对象,主要是销毁ChannelSftp
    @Override
    public void destroyObject(PooledObject<ChannelSftp> p) throws Exception {
      p.getObject().disconnect();
    }
    // 创建对象ChannelSftp
    @Override
    public ChannelSftp create() throws Exception {
      int timeout = properties.getTimeout();
      return JschUtil.openSftpChannel(this.session, timeout);
    }
    // 包装创建出来的对象
    @Override
    public PooledObject<ChannelSftp> wrap(ChannelSftp channelSftp) {
      return new DefaultPooledObject<>(channelSftp);
    }
    // 验证对象是否可用
    @Override
    public boolean validateObject(PooledObject<ChannelSftp> p) {
      return p.getObject().isConnected();
    }
    // 销毁资源,关闭session
    @Override
    public void close() throws Exception {
      if (Objects.nonNull(session)) {
        if (session.isConnected()) {
          session.disconnect();
        }
        session = null;
      }
    }
  }

為了實現真正的池化操作,我們還需要以下程式碼:

1.我們需要在SftpClient物件中建立一個GenericObjectPool物件池,這個才是真正的池子,它負責創建和儲存所有的物件。

2.我們還需要提供資源銷毀的功能,也就是實作AutoCloseable,在服務停止時,需要把相關的資源銷毀。

public class SftpClient implements AutoCloseable {
  private SftpFactory sftpFactory;
  GenericObjectPool<ChannelSftp> objectPool;
  // 构造方法1
  public SftpClient(SftpProperties properties, GenericObjectPoolConfig<ChannelSftp> poolConfig) throws AwesomeException {
    this.sftpFactory = new SftpFactory(properties);
    objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
  }
  // 构造方法2
  public SftpClient(SftpProperties properties) throws AwesomeException {
    this.sftpFactory = new SftpFactory(properties);
    SftpProperties.PoolConfig config = properties.getPool();
    // 默认池化配置
    if (Objects.isNull(config)) {
      objectPool = new GenericObjectPool<>(this.sftpFactory);
    } else {
      // 自定义池化配置
      GenericObjectPoolConfig<ChannelSftp> poolConfig = new GenericObjectPoolConfig<>();
      poolConfig.setMaxIdle(config.getMaxIdle());
      poolConfig.setMaxTotal(config.getMaxTotal());
      poolConfig.setMinIdle(config.getMinIdle());
      poolConfig.setTestOnBorrow(config.isTestOnBorrow());
      poolConfig.setTestOnCreate(config.isTestOnCreate());
      poolConfig.setTestOnReturn(config.isTestOnReturn());
      poolConfig.setTestWhileIdle(config.isTestWhileIdle());
      poolConfig.setBlockWhenExhausted(config.isBlockWhenExhausted());
      poolConfig.setMaxWait(Duration.ofMillis(config.getMaxWaitMillis()));
      poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(config.getTimeBetweenEvictionRunsMillis()));
      objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
    }
  }
  
  // 销毁资源
    @Override
  public void close() throws Exception {
    // 销毁链接池
    if (Objects.nonNull(this.objectPool)) {
      if (!this.objectPool.isClosed()) {
        this.objectPool.close();
      }
    }
    this.objectPool = null;
    // 销毁sftpFactory
    if (Objects.nonNull(this.sftpFactory)) {
      this.sftpFactory.close();
    }
  }
}

SFTP連結池的使用

我們已經對連結池進行了初始化,下面我們就可以從連結池中取得我們需要的ChannelSftp來實現文件的上傳下載了。

下面實作了多種檔案上傳下載的方式:

#1.直接把本機檔案上傳到SFTP伺服器的指定路徑;

2.把InputStream輸入流提交到SFTP伺服器指定路徑中;

3.可以針對以上兩種上傳方式進行進度的監控;

4.把SFTP伺服器中的指定檔案下載到本機上;

5.把SFTP伺服器˙中的檔案寫入指定的輸出流;

6.針對以上兩種下載方式,監控下載進度;

  /**
   * 上传文件
   *
   * @param srcFilePath
   * @param targetDir
   * @param targetFileName
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName) throws AwesomeException {
    return uploadFile(srcFilePath, targetDir, targetFileName, null);
  }
  /**
   * 上传文件
   *
   * @param srcFilePath
   * @param targetDir
   * @param targetFileName
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      // 从链接池获取对象
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目标文件夹
      if (!exist(channelSftp, targetDir)) {
        mkdirs(channelSftp, targetDir);
      }
      channelSftp.cd(targetDir);
      // 上传文件
      if (Objects.nonNull(monitor)) {
        channelSftp.put(srcFilePath, targetFileName, monitor);
      } else {
        channelSftp.put(srcFilePath, targetFileName);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        // 返还对象给链接池
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 上传文件到目标文件夹
   *
   * @param in
   * @param targetDir
   * @param targetFileName
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(InputStream in, String targetDir, String targetFileName) throws AwesomeException {
    return uploadFile(in, targetDir, targetFileName, null);
  }
  /**
   * 上传文件,添加进度监视器
   *
   * @param in
   * @param targetDir
   * @param targetFileName
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(InputStream in, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目标文件夹
      if (!exist(channelSftp, targetDir)) {
        mkdirs(channelSftp, targetDir);
      }
      channelSftp.cd(targetDir);
      if (Objects.nonNull(monitor)) {
        channelSftp.put(in, targetFileName, monitor);
      } else {
        channelSftp.put(in, targetFileName);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 下载文件
   *
   * @param remoteFile
   * @param targetFilePath
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, String targetFilePath) throws AwesomeException {
    return downloadFile(remoteFile, targetFilePath, null);
  }
  /**
   * 下载目标文件到本地
   *
   * @param remoteFile
   * @param targetFilePath
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, String targetFilePath, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目标文件夹
      if (!exist(channelSftp, remoteFile)) {
        // 不用下载了
        return false;
      }
      File targetFile = new File(targetFilePath);
      try (FileOutputStream outputStream = new FileOutputStream(targetFile)) {
        if (Objects.nonNull(monitor)) {
          channelSftp.get(remoteFile, outputStream, monitor);
        } else {
          channelSftp.get(remoteFile, outputStream);
        }
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 下载文件
   *
   * @param remoteFile
   * @param outputStream
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, OutputStream outputStream) throws AwesomeException {
    return downloadFile(remoteFile, outputStream, null);
  }
  /**
   * 下载文件
   *
   * @param remoteFile
   * @param outputStream
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, OutputStream outputStream, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目标文件夹
      if (!exist(channelSftp, remoteFile)) {
        // 不用下载了
        return false;
      }
      if (Objects.nonNull(monitor)) {
        channelSftp.get(remoteFile, outputStream, monitor);
      } else {
        channelSftp.get(remoteFile, outputStream);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 创建文件夹
   *
   * @param channelSftp
   * @param dir
   * @return
   */
  protected boolean mkdirs(ChannelSftp channelSftp, String dir) {
    try {
      String pwd = channelSftp.pwd();
      if (StringUtils.contains(pwd, dir)) {
        return true;
      }
      String relativePath = StringUtils.substringAfter(dir, pwd);
      String[] dirs = StringUtils.splitByWholeSeparatorPreserveAllTokens(relativePath, "/");
      for (String path : dirs) {
        if (StringUtils.isBlank(path)) {
          continue;
        }
        try {
          channelSftp.cd(path);
        } catch (SftpException e) {
          channelSftp.mkdir(path);
          channelSftp.cd(path);
        }
      }
      return true;
    } catch (Exception e) {
      return false;
    }
  }
  /**
   * 判断文件夹是否存在
   *
   * @param channelSftp
   * @param dir
   * @return
   */
  protected boolean exist(ChannelSftp channelSftp, String dir) {
    try {
      channelSftp.lstat(dir);
      return true;
    } catch (Exception e) {
      return false;
    }
  }

整合到SpringBoot中

我們可以透過java config的方式,把我們已經實現好的SftpClient類別實例化到Spring IOC容器中來管理,以便讓開發人員在整個專案中透過@Autowired的方式就可以直接使用。

設定

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
 * @author zouwei
 * @className SftpProperties
 * @date: 2022/8/19 下午12:12
 * @description:
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "sftp.config")
public class SftpProperties {
  // 用户名
  private String username;
  // 密码
  private String password;
  // 主机名
  private String host;
  // 端口
  private int port;
  // 密钥
  private String privateKeyFile;
  // 口令
  private String passphrase;
  // 通道链接超时时间
  private int timeout;
  // 链接池配置
  private PoolConfig pool;
  @Data
  public static class PoolConfig {
    //最大空闲实例数,空闲超过此值将会被销毁淘汰
    private int maxIdle;
    // 最小空闲实例数,对象池将至少保留2个空闲对象
    private int minIdle;
    //最大对象数量,包含借出去的和空闲的
    private int maxTotal;
    //对象池满了,是否阻塞获取(false则借不到直接抛异常)
    private boolean blockWhenExhausted;
    // BlockWhenExhausted为true时生效,对象池满了阻塞获取超时,不设置则阻塞获取不超时,也可在borrowObject方法传递第二个参数指定本次的超时时间
    private long maxWaitMillis;
    // 创建对象后是否验证对象,调用objectFactory#validateObject
    private boolean testOnCreate;
    // 借用对象后是否验证对象 validateObject
    private boolean testOnBorrow;
    // 归还对象后是否验证对象 validateObject
    private boolean testOnReturn;
    // 定时检查期间是否验证对象 validateObject
    private boolean testWhileIdle;
    //定时检查淘汰多余的对象, 启用单独的线程处理
    private long timeBetweenEvictionRunsMillis;
    //jmx监控,和springboot自带的jmx冲突,可以选择关闭此配置或关闭springboot的jmx配置
    private boolean jmxEnabled;
  }
}

java Bean注入

import com.example.awesomespring.exception.AwesomeException;
import com.example.awesomespring.sftp.SftpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author zouwei
 * @className SftpConfig
 * @date: 2022/8/19 下午12:12
 * @description:
 */
@Configuration
public class SftpConfig {
  @Autowired
  private SftpProperties properties;
  // 创建SftpClient对象
  @Bean(destroyMethod = "close")
  @ConditionalOnProperty(prefix = "sftp.config")
  public SftpClient sftpClient() throws AwesomeException {
    return new SftpClient(properties);
  }
}

透過以上程式碼,我們就可以在專案的任何地方直接使用SFTP客戶端來上傳和下載檔案了。

以上是SpringBoot怎麼整合SFTP客戶端實作檔案上傳下載的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除