From 5c3ba2282cb04842859358f66f149262dbc40b07 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Sun, 23 May 2010 23:51:06 +0100 Subject: [PATCH] * further fixed up getting & setting of file attributes * added a TransferListener for monitoring file transfer --- .../net/schmizz/sshj/common/StreamCopier.java | 14 +++- .../schmizz/sshj/sftp/SFTPFileTransfer.java | 45 ++++++---- .../sshj/xfer/AbstractFileTransfer.java | 25 +++--- .../net/schmizz/sshj/xfer/FileTransfer.java | 4 +- .../sshj/xfer/LoggingTransferListener.java | 67 +++++++++++++++ .../schmizz/sshj/xfer/ProgressListener.java | 11 --- .../schmizz/sshj/xfer/TransferListener.java | 16 ++++ .../sshj/xfer/scp/SCPDownloadClient.java | 84 ++++++++++--------- .../net/schmizz/sshj/xfer/scp/SCPEngine.java | 6 +- .../sshj/xfer/scp/SCPFileTransfer.java | 5 +- .../sshj/xfer/scp/SCPUploadClient.java | 20 ++--- 11 files changed, 201 insertions(+), 96 deletions(-) create mode 100644 src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java delete mode 100644 src/main/java/net/schmizz/sshj/xfer/ProgressListener.java create mode 100644 src/main/java/net/schmizz/sshj/xfer/TransferListener.java diff --git a/src/main/java/net/schmizz/sshj/common/StreamCopier.java b/src/main/java/net/schmizz/sshj/common/StreamCopier.java index af7ba9f9..ca7965a5 100644 --- a/src/main/java/net/schmizz/sshj/common/StreamCopier.java +++ b/src/main/java/net/schmizz/sshj/common/StreamCopier.java @@ -41,10 +41,15 @@ public class StreamCopier }; } - public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing) + public interface Listener { + void reportProgress(long transferred); + } + + public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing, Listener listener) throws IOException { long count = 0; + final boolean reportProgress = listener != null; final long startTime = System.currentTimeMillis(); final byte[] buf = new byte[bufSize]; @@ -54,6 +59,8 @@ public class StreamCopier count += read; if (keepFlushing) out.flush(); + if (reportProgress) + listener.reportProgress(count); } if (!keepFlushing) out.flush(); @@ -65,6 +72,11 @@ public class StreamCopier return count; } + public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing) + throws IOException { + return copy(in, out, bufSize, keepFlushing, null); + } + public static String copyStreamToString(InputStream stream) throws IOException { final StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java index 29f3ff10..491e247c 100644 --- a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java @@ -20,6 +20,7 @@ import net.schmizz.sshj.sftp.Response.StatusCode; import net.schmizz.sshj.xfer.AbstractFileTransfer; import net.schmizz.sshj.xfer.FileTransfer; import net.schmizz.sshj.xfer.FileTransferUtil; +import net.schmizz.sshj.xfer.TransferListener; import java.io.File; import java.io.FileFilter; @@ -89,27 +90,35 @@ public class SFTPFileTransfer private class Downloader { + private final TransferListener listener = getTransferListener(); + private void download(final RemoteResourceInfo remote, final File local) throws IOException { - log.info("Downloading [{}] to [{}]", remote, local); + final File adjustedFile; switch (remote.getAttributes().getType()) { case DIRECTORY: - downloadDir(remote, local); + listener.startedDir(remote.getName()); + adjustedFile = downloadDir(remote, local); + listener.finishedDir(); break; case UNKNOWN: - log.warn("Server did not supply information about the type of file at `{}` -- assuming it is a regular file!"); + log.warn("Server did not supply information about the type of file at `{}` " + + "-- assuming it is a regular file!", remote.getPath()); case REGULAR: - downloadFile(remote, local); + listener.startedFile(remote.getName(), remote.getAttributes().getSize()); + adjustedFile = downloadFile(remote, local); + listener.finishedFile(); break; default: throw new IOException(remote + " is not a regular file or directory"); } + copyAttributes(remote, adjustedFile); + } - private void downloadDir(final RemoteResourceInfo remote, final File local) + private File downloadDir(final RemoteResourceInfo remote, final File local) throws IOException { final File adjusted = FileTransferUtil.getTargetDirectory(local, remote.getName()); - copyAttributes(remote, adjusted); final RemoteDirectory rd = sftp.openDir(remote.getPath()); try { for (RemoteResourceInfo rri : rd.scan(getDownloadFilter())) @@ -117,24 +126,25 @@ public class SFTPFileTransfer } finally { rd.close(); } + return adjusted; } - private void downloadFile(final RemoteResourceInfo remote, final File local) + private File downloadFile(final RemoteResourceInfo remote, final File local) throws IOException { final File adjusted = FileTransferUtil.getTargetFile(local, remote.getName()); - copyAttributes(remote, adjusted); final RemoteFile rf = sftp.open(remote.getPath()); try { final FileOutputStream fos = new FileOutputStream(adjusted); try { StreamCopier.copy(rf.getInputStream(), fos, sftp.getSubsystem() - .getLocalMaxPacketSize(), false); + .getLocalMaxPacketSize(), false, listener); } finally { fos.close(); } } finally { rf.close(); } + return adjusted; } private void copyAttributes(final RemoteResourceInfo remote, final File local) @@ -151,15 +161,20 @@ public class SFTPFileTransfer private class Uploader { + private final TransferListener listener = getTransferListener(); + private void upload(File local, String remote) throws IOException { - log.info("Uploading [{}] to [{}]", local, remote); final String adjustedPath; - if (local.isDirectory()) + if (local.isDirectory()) { + listener.startedDir(local.getName()); adjustedPath = uploadDir(local, remote); - else if (local.isFile()) + listener.finishedDir(); + } else if (local.isFile()) { + listener.startedFile(local.getName(), local.length()); adjustedPath = uploadFile(local, remote); - else + listener.finishedFile(); + } else throw new IOException(local + " is not a file or directory"); sftp.setAttributes(adjustedPath, getAttributes(local)); } @@ -181,8 +196,8 @@ public class SFTPFileTransfer try { final FileInputStream fis = new FileInputStream(local); try { - StreamCopier.copy(fis, rf.getOutputStream(), sftp.getSubsystem().getRemoteMaxPacketSize() - - rf.getOutgoingPacketOverhead(), false); + final int bufSize = sftp.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead(); + StreamCopier.copy(fis, rf.getOutputStream(), bufSize, false, listener); } finally { fis.close(); } diff --git a/src/main/java/net/schmizz/sshj/xfer/AbstractFileTransfer.java b/src/main/java/net/schmizz/sshj/xfer/AbstractFileTransfer.java index e3f51cd1..979c0e38 100644 --- a/src/main/java/net/schmizz/sshj/xfer/AbstractFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/xfer/AbstractFileTransfer.java @@ -20,19 +20,18 @@ import org.slf4j.LoggerFactory; public abstract class AbstractFileTransfer { - /** Logger */ protected final Logger log = LoggerFactory.getLogger(getClass()); - public static final ModeGetter defaultModeGetter = new DefaultModeGetter(); - public static final ModeSetter defaultModeSetter = new DefaultModeSetter(); + public static final ModeGetter DEFAULT_MODE_SETTER = new DefaultModeGetter(); + public static final ModeSetter DEFAULT_MODE_GETTER = new DefaultModeSetter(); + public static final LoggingTransferListener LOGGING_TRANSFER_LISTENER = new LoggingTransferListener(); - private volatile ModeGetter modeGetter = defaultModeGetter; - private volatile ModeSetter modeSetter = defaultModeSetter; - - private volatile ProgressListener progressListener; + private volatile ModeGetter modeGetter = DEFAULT_MODE_SETTER; + private volatile ModeSetter modeSetter = DEFAULT_MODE_GETTER; + private volatile TransferListener transferListener = LOGGING_TRANSFER_LISTENER; public void setModeGetter(ModeGetter modeGetter) { - this.modeGetter = (modeGetter == null) ? defaultModeGetter : modeGetter; + this.modeGetter = (modeGetter == null) ? DEFAULT_MODE_SETTER : modeGetter; } public ModeGetter getModeGetter() { @@ -40,19 +39,19 @@ public abstract class AbstractFileTransfer { } public void setModeSetter(ModeSetter modeSetter) { - this.modeSetter = (modeSetter == null) ? defaultModeSetter : modeSetter; + this.modeSetter = (modeSetter == null) ? DEFAULT_MODE_GETTER : modeSetter; } public ModeSetter getModeSetter() { return this.modeSetter; } - public ProgressListener getProgressListener() { - return progressListener; + public TransferListener getTransferListener() { + return transferListener; } - public void setProgressListener(ProgressListener progressListener) { - this.progressListener = progressListener; + public void setTransferListener(TransferListener transferListener) { + this.transferListener = (transferListener == null) ? LOGGING_TRANSFER_LISTENER : transferListener; } } diff --git a/src/main/java/net/schmizz/sshj/xfer/FileTransfer.java b/src/main/java/net/schmizz/sshj/xfer/FileTransfer.java index 5aabe64a..43656fcd 100644 --- a/src/main/java/net/schmizz/sshj/xfer/FileTransfer.java +++ b/src/main/java/net/schmizz/sshj/xfer/FileTransfer.java @@ -33,8 +33,8 @@ public interface FileTransfer { void setModeSetter(ModeSetter modeSetter); - ProgressListener getProgressListener(); + TransferListener getTransferListener(); - void setProgressListener(ProgressListener listener); + void setTransferListener(TransferListener listener); } diff --git a/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java b/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java new file mode 100644 index 00000000..114f45e9 --- /dev/null +++ b/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java @@ -0,0 +1,67 @@ +package net.schmizz.sshj.xfer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class LoggingTransferListener + implements TransferListener { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final List dirNames = new LinkedList(); + private String base = ""; + private String name = ""; + private long size = -1; + + public void startedDir(String name) { + dirNames.add(name); + size = -1; + fixBase(); + log.info("started transferring directory `{}`", currentNode()); + } + + @Override + public void startedFile(String name, long size) { + this.name = name; + this.size = size; + log.info("started transferring file `{}` ({} bytes)", currentNode(), size); + } + + @Override + public void reportProgress(long transferred) { + if (log.isDebugEnabled()) { + log.debug("transferred {}% of `{}`", ((transferred * 100) / size), currentNode()); + } + } + + @Override + public void finishedFile() { + log.info("finished transferring file `{}`", currentNode()); + name = ""; + size = -1; + } + + @Override + public void finishedDir() { + log.info("finished transferring dir `{}`", currentNode()); + size = -1; + dirNames.remove(dirNames.size() - 1); + fixBase(); + } + + private void fixBase() { + final StringBuilder qualifier = new StringBuilder(); + for (String parent : dirNames) { + qualifier.append(parent).append("/"); + } + base = qualifier.toString(); + } + + private String currentNode() { + return base + name; + } + +} diff --git a/src/main/java/net/schmizz/sshj/xfer/ProgressListener.java b/src/main/java/net/schmizz/sshj/xfer/ProgressListener.java deleted file mode 100644 index f804bc9e..00000000 --- a/src/main/java/net/schmizz/sshj/xfer/ProgressListener.java +++ /dev/null @@ -1,11 +0,0 @@ -package net.schmizz.sshj.xfer; - -public interface ProgressListener { - - void started(int item, boolean isDir); - - void progressed(long done, long total); - - void completed(int item); - -} diff --git a/src/main/java/net/schmizz/sshj/xfer/TransferListener.java b/src/main/java/net/schmizz/sshj/xfer/TransferListener.java new file mode 100644 index 00000000..6595f23f --- /dev/null +++ b/src/main/java/net/schmizz/sshj/xfer/TransferListener.java @@ -0,0 +1,16 @@ +package net.schmizz.sshj.xfer; + +import net.schmizz.sshj.common.StreamCopier; + +public interface TransferListener + extends StreamCopier.Listener { + + void startedDir(String name); + + void startedFile(String name, long size); + + void finishedFile(); + + void finishedDir(); + +} \ No newline at end of file diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java index 47394e85..760cf65a 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java @@ -20,6 +20,7 @@ import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.xfer.FileTransferUtil; import net.schmizz.sshj.xfer.ModeSetter; +import net.schmizz.sshj.xfer.TransferListener; import java.io.File; import java.io.FileOutputStream; @@ -35,8 +36,8 @@ public final class SCPDownloadClient private boolean recursive = true; - SCPDownloadClient(SessionFactory host, ModeSetter modeSetter) { - super(host); + SCPDownloadClient(SessionFactory host, TransferListener listener, ModeSetter modeSetter) { + super(host, listener); this.modeSetter = modeSetter; } @@ -98,17 +99,6 @@ public final class SCPDownloadClient return Integer.parseInt(cmd.substring(1), 8); } - private void prepare(File f, int perms, String tMsg) - throws IOException { - modeSetter.setPermissions(f, perms); - - if (tMsg != null && modeSetter.preservesTimes()) { - String[] tMsgParts = tokenize(tMsg, 4); // e.g. T 0 0 - modeSetter.setLastModifiedTime(f, parseLong(tMsgParts[0].substring(1), "last modified time")); - modeSetter.setLastAccessedTime(f, parseLong(tMsgParts[2], "last access time")); - } - } - private boolean process(String bufferedTMsg, String msg, File f) throws IOException { if (msg.length() < 1) @@ -150,41 +140,53 @@ public final class SCPDownloadClient private void processDirectory(String dMsg, String tMsg, File f) throws IOException { - String[] dMsgParts = tokenize(dMsg, 3); // e.g. D0755 0 - - long length = parseLong(dMsgParts[1], "dir length"); + final String[] dMsgParts = tokenize(dMsg, 3); // D 0 + final long length = parseLong(dMsgParts[1], "dir length"); + final String dirname = dMsgParts[2]; if (length != 0) throw new IOException("Remote SCP command sent strange directory length: " + length); - - f = FileTransferUtil.getTargetDirectory(f, dMsgParts[2]); - prepare(f, parsePermissions(dMsgParts[0]), tMsg); - - signal("ACK: D"); - - do { - } while (!process(null, readMessage(), f)); - - signal("ACK: E"); + listener.startedDir(dirname); + { + f = FileTransferUtil.getTargetDirectory(f, dirname); + signal("ACK: D"); + do { + } while (!process(null, readMessage(), f)); + setAttributes(f, parsePermissions(dMsgParts[0]), tMsg); + signal("ACK: E"); + } + listener.finishedDir(); } private void processFile(String cMsg, String tMsg, File f) throws IOException { - String[] cMsgParts = tokenize(cMsg, 3); - - long length = parseLong(cMsgParts[1], "length"); - - f = FileTransferUtil.getTargetFile(f, cMsgParts[2]); - prepare(f, parsePermissions(cMsgParts[0]), tMsg); - - signal("Remote can start transfer"); - final FileOutputStream fos = new FileOutputStream(f); - try { - transfer(scp.getInputStream(), fos, scp.getLocalMaxPacketSize(), length); - } finally { - IOUtils.closeQuietly(fos); + final String[] cMsgParts = tokenize(cMsg, 3); // C + final long length = parseLong(cMsgParts[1], "length"); + final String filename = cMsgParts[2]; + listener.startedFile(filename, length); + { + f = FileTransferUtil.getTargetFile(f, filename); + signal("Remote can start transfer"); + final FileOutputStream fos = new FileOutputStream(f); + try { + transfer(scp.getInputStream(), fos, scp.getLocalMaxPacketSize(), length); + } finally { + IOUtils.closeQuietly(fos); + } + check("Remote agrees transfer done"); + setAttributes(f, parsePermissions(cMsgParts[0]), tMsg); + signal("Transfer done"); + } + listener.finishedFile(); + } + + private void setAttributes(File f, int perms, String tMsg) + throws IOException { + modeSetter.setPermissions(f, perms); + if (tMsg != null && modeSetter.preservesTimes()) { + String[] tMsgParts = tokenize(tMsg, 4); // e.g. T 0 0 + modeSetter.setLastModifiedTime(f, parseLong(tMsgParts[0].substring(1), "last modified time")); + modeSetter.setLastAccessedTime(f, parseLong(tMsgParts[2], "last access time")); } - check("Remote agrees transfer done"); - signal("Transfer done"); } private String[] tokenize(String msg, int numPartsExpected) diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java index 94ce405b..715208bb 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java @@ -19,6 +19,7 @@ import net.schmizz.sshj.common.IOUtils; import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.connection.channel.direct.Session.Command; import net.schmizz.sshj.connection.channel.direct.SessionFactory; +import net.schmizz.sshj.xfer.TransferListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +60,15 @@ abstract class SCPEngine { final Logger log = LoggerFactory.getLogger(getClass()); final SessionFactory host; + final TransferListener listener; final Queue warnings = new LinkedList(); Command scp; int exitStatus; - SCPEngine(SessionFactory host) { + SCPEngine(SessionFactory host, TransferListener listener) { this.host = host; + this.listener = listener; } public int copy(String sourcePath, String targetPath) @@ -201,6 +204,7 @@ abstract class SCPEngine { while (count < len && (read = in.read(buf, 0, (int) Math.min(bufSize, len - count))) != -1) { out.write(buf, 0, read); count += read; + listener.reportProgress(count); } out.flush(); diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPFileTransfer.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPFileTransfer.java index 8a9edaa5..cc0a671f 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPFileTransfer.java @@ -24,6 +24,7 @@ import java.io.IOException; public class SCPFileTransfer extends AbstractFileTransfer implements FileTransfer { + private final SessionFactory sessionFactory; public SCPFileTransfer(SessionFactory sessionFactory) { @@ -31,11 +32,11 @@ public class SCPFileTransfer } public SCPDownloadClient newSCPDownloadClient() { - return new SCPDownloadClient(sessionFactory, getModeSetter()); + return new SCPDownloadClient(sessionFactory, getTransferListener(), getModeSetter()); } public SCPUploadClient newSCPUploadClient() { - return new SCPUploadClient(sessionFactory, getModeGetter()); + return new SCPUploadClient(sessionFactory, getTransferListener(), getModeGetter()); } @Override diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java index f83b0ea2..6a60d883 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java @@ -19,6 +19,7 @@ import net.schmizz.sshj.common.IOUtils; import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.xfer.ModeGetter; +import net.schmizz.sshj.xfer.TransferListener; import java.io.File; import java.io.FileFilter; @@ -36,8 +37,8 @@ public final class SCPUploadClient private FileFilter fileFilter; - SCPUploadClient(SessionFactory host, ModeGetter modeGetter) { - super(host); + SCPUploadClient(SessionFactory host, TransferListener listener, ModeGetter modeGetter) { + super(host, listener); this.modeGetter = modeGetter; } @@ -80,30 +81,29 @@ public final class SCPUploadClient private void process(File f) throws IOException { - if (f.isDirectory()) + if (f.isDirectory()) { + listener.startedDir(f.getName()); sendDirectory(f); - else if (f.isFile()) + listener.finishedDir(); + } else if (f.isFile()) { + listener.startedFile(f.getName(), f.length()); sendFile(f); - else + listener.finishedFile(); + } else throw new IOException(f + " is not a regular file or directory"); } private void sendDirectory(File f) throws IOException { - log.info("Entering directory `{}`", f.getName()); preserveTimeIfPossible(f); sendMessage("D0" + getPermString(f) + " 0 " + f.getName()); - for (File child : getChildren(f)) process(child); - sendMessage("E"); - log.info("Exiting directory `{}`", f.getName()); } private void sendFile(File f) throws IOException { - log.info("Sending `{}`...", f.getName()); preserveTimeIfPossible(f); final InputStream src = new FileInputStream(f); try {