From bc59c81dbccb3a02793edef82d9754f150e73d73 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Sun, 29 Jan 2012 22:45:58 +0000 Subject: [PATCH] Refactor TransferListener interface to support thread-safe, immutable implementation. Fix #56 --- .../schmizz/sshj/sftp/SFTPFileTransfer.java | 62 ++++++++-------- .../sshj/xfer/LoggingTransferListener.java | 71 +++++++------------ .../schmizz/sshj/xfer/TransferListener.java | 11 +-- .../sshj/xfer/scp/SCPDownloadClient.java | 25 ++++--- .../net/schmizz/sshj/xfer/scp/SCPEngine.java | 37 ++++------ .../sshj/xfer/scp/SCPUploadClient.java | 22 +++--- .../sshj/xfer/scp/SCPUploadClientTest.java | 54 -------------- 7 files changed, 92 insertions(+), 190 deletions(-) delete mode 100644 src/test/java/net/schmizz/sshj/xfer/scp/SCPUploadClientTest.java diff --git a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java index 589642f2..0eb45dfb 100644 --- a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java @@ -46,7 +46,7 @@ public class SFTPFileTransfer @Override public void upload(String source, String dest) throws IOException { - new Uploader().upload(new FileSystemFile(source), dest); + upload(new FileSystemFile(source), dest); } @Override @@ -58,7 +58,7 @@ public class SFTPFileTransfer @Override public void upload(LocalSourceFile localFile, String remotePath) throws IOException { - new Uploader().upload(localFile, remotePath); + new Uploader().upload(getTransferListener(), localFile, remotePath); } @Override @@ -66,7 +66,7 @@ public class SFTPFileTransfer throws IOException { final PathComponents pathComponents = engine.getPathHelper().getComponents(source); final FileAttributes attributes = engine.stat(source); - new Downloader().download(new RemoteResourceInfo(pathComponents, attributes), dest); + new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest); } public void setUploadFilter(LocalFileFilter uploadFilter) { @@ -87,24 +87,21 @@ public class SFTPFileTransfer private class Downloader { - private final TransferListener listener = getTransferListener(); - - private void download(final RemoteResourceInfo remote, final LocalDestFile local) + private void download(final TransferListener listener, + final RemoteResourceInfo remote, + final LocalDestFile local) throws IOException { final LocalDestFile adjustedFile; switch (remote.getAttributes().getType()) { case DIRECTORY: - listener.startedDir(remote.getName()); - adjustedFile = downloadDir(remote, local); - listener.finishedDir(); + adjustedFile = downloadDir(listener.directory(remote.getName()), remote, local); break; case UNKNOWN: log.warn("Server did not supply information about the type of file at `{}` " + "-- assuming it is a regular file!", remote.getPath()); case REGULAR: - listener.startedFile(remote.getName(), remote.getAttributes().getSize()); - adjustedFile = downloadFile(remote, local); - listener.finishedFile(); + adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()), + remote, local); break; default: throw new IOException(remote + " is not a regular file or directory"); @@ -113,20 +110,24 @@ public class SFTPFileTransfer } - private LocalDestFile downloadDir(final RemoteResourceInfo remote, final LocalDestFile local) + private LocalDestFile downloadDir(final TransferListener listener, + final RemoteResourceInfo remote, + final LocalDestFile local) throws IOException { final LocalDestFile adjusted = local.getTargetDirectory(remote.getName()); final RemoteDirectory rd = engine.openDir(remote.getPath()); try { for (RemoteResourceInfo rri : rd.scan(getDownloadFilter())) - download(rri, adjusted.getChild(rri.getName())); + download(listener, rri, adjusted.getChild(rri.getName())); } finally { rd.close(); } return adjusted; } - private LocalDestFile downloadFile(final RemoteResourceInfo remote, final LocalDestFile local) + private LocalDestFile downloadFile(final StreamCopier.Listener listener, + final RemoteResourceInfo remote, + final LocalDestFile local) throws IOException { final LocalDestFile adjusted = local.getTargetFile(remote.getName()); final RemoteFile rf = engine.open(remote.getPath()); @@ -161,33 +162,33 @@ public class SFTPFileTransfer private class Uploader { - private final TransferListener listener = getTransferListener(); - - private void upload(LocalSourceFile local, String remote) + private void upload(final TransferListener listener, + final LocalSourceFile local, + final String remote) throws IOException { final String adjustedPath; if (local.isDirectory()) { - listener.startedDir(local.getName()); - adjustedPath = uploadDir(local, remote); - listener.finishedDir(); + adjustedPath = uploadDir(listener.directory(local.getName()), local, remote); } else if (local.isFile()) { - listener.startedFile(local.getName(), local.getLength()); - adjustedPath = uploadFile(local, remote); - listener.finishedFile(); + adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote); } else throw new IOException(local + " is not a file or directory"); engine.setAttributes(adjustedPath, getAttributes(local)); } - private String uploadDir(LocalSourceFile local, String remote) + private String uploadDir(final TransferListener listener, + final LocalSourceFile local, + final String remote) throws IOException { final String adjusted = prepareDir(local, remote); for (LocalSourceFile f : local.getChildren(getUploadFilter())) - upload(f, adjusted); + upload(listener, f, adjusted); return adjusted; } - private String uploadFile(LocalSourceFile local, String remote) + private String uploadFile(final StreamCopier.Listener listener, + final LocalSourceFile local, + final String remote) throws IOException { final String adjusted = prepareFile(local, remote); final RemoteFile rf = engine.open(adjusted, EnumSet.of(OpenMode.WRITE, @@ -210,7 +211,7 @@ public class SFTPFileTransfer return adjusted; } - private String prepareDir(LocalSourceFile local, String remote) + private String prepareDir(final LocalSourceFile local, final String remote) throws IOException { final FileAttributes attrs; try { @@ -236,7 +237,7 @@ public class SFTPFileTransfer throw new IOException(attrs.getMode().getType() + " file already exists at " + remote); } - private String prepareFile(LocalSourceFile local, String remote) + private String prepareFile(final LocalSourceFile local, final String remote) throws IOException { final FileAttributes attrs; try { @@ -250,8 +251,7 @@ public class SFTPFileTransfer } if (attrs.getMode().getType() == FileMode.Type.DIRECTORY) { log.debug("probeFile: {} was directory, path adjusted for {}", remote, local.getName()); - remote = engine.getPathHelper().adjustForParent(remote, local.getName()); - return remote; + return engine.getPathHelper().adjustForParent(remote, local.getName()); } else { log.debug("probeFile: {} is a {} file that will be replaced", remote, attrs.getMode().getType()); return remote; diff --git a/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java b/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java index 69f6d50f..c98d68db 100644 --- a/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java +++ b/src/main/java/net/schmizz/sshj/xfer/LoggingTransferListener.java @@ -1,68 +1,45 @@ package net.schmizz.sshj.xfer; +import net.schmizz.sshj.common.StreamCopier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.io.IOException; public class LoggingTransferListener implements TransferListener { private final Logger log = LoggerFactory.getLogger(getClass()); - private final List dirNames = new ArrayList(); - private String base = ""; - private String name = ""; - private long size = -1; + private final String relPath; - @Override - public void startedDir(String name) { - dirNames.add(name); - size = -1; - fixBase(); - log.info("started transferring directory `{}`", currentNode()); + public LoggingTransferListener() { + this(""); + } + + private LoggingTransferListener(String relPath) { + this.relPath = relPath; } @Override - public void startedFile(String name, long size) { - this.name = name; - this.size = size; - log.info("started transferring file `{}` ({} bytes)", currentNode(), size); + public TransferListener directory(String name) { + log.info("started transferring directory `{}`", name); + return new LoggingTransferListener(relPath + name + "/"); } @Override - public void reportProgress(long transferred) { - if (log.isDebugEnabled()) { - log.debug("transferred {}% of `{}`", ((transferred * 100) / size), currentNode()); - } - } - - @Override - public void finishedFile() { - log.info("finished transferring file `{}`", currentNode()); - name = ""; - size = -1; - } - - @Override - public void finishedDir() { - log.info("finished transferring dir `{}`", currentNode()); - size = -1; - dirNames.remove(dirNames.size() - 1); - fixBase(); - } - - private void fixBase() { - final StringBuilder qualifier = new StringBuilder(); - for (String parent : dirNames) { - qualifier.append(parent).append("/"); - } - base = qualifier.toString(); - } - - private String currentNode() { - return base + name; + public StreamCopier.Listener file(final String name, final long size) { + final String path = relPath + name; + log.info("started transferring file `{}` ({} bytes)", path, size); + return new StreamCopier.Listener() { + @Override + public void reportProgress(long transferred) + throws IOException { + if (log.isDebugEnabled()) { + log.debug("transferred {}% of `{}`", ((transferred * 100) / size), path); + } + } + }; } } diff --git a/src/main/java/net/schmizz/sshj/xfer/TransferListener.java b/src/main/java/net/schmizz/sshj/xfer/TransferListener.java index 6595f23f..d67370a0 100644 --- a/src/main/java/net/schmizz/sshj/xfer/TransferListener.java +++ b/src/main/java/net/schmizz/sshj/xfer/TransferListener.java @@ -2,15 +2,10 @@ package net.schmizz.sshj.xfer; import net.schmizz.sshj.common.StreamCopier; -public interface TransferListener - extends StreamCopier.Listener { +public interface TransferListener { - void startedDir(String name); + TransferListener directory(String name); - void startedFile(String name, long size); - - void finishedFile(); - - void finishedDir(); + StreamCopier.Listener file(String name, long size); } \ No newline at end of file diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java index 92255197..47fc4bcc 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPDownloadClient.java @@ -17,6 +17,7 @@ package net.schmizz.sshj.xfer.scp; import net.schmizz.sshj.common.IOUtils; import net.schmizz.sshj.xfer.LocalDestFile; +import net.schmizz.sshj.xfer.TransferListener; import net.schmizz.sshj.xfer.scp.SCPEngine.Arg; import java.io.IOException; @@ -71,7 +72,7 @@ public final class SCPDownloadClient { String msg = engine.readMessage(); do - process(null, msg, targetFile); + process(engine.getTransferListener(), null, msg, targetFile); while (!(msg = engine.readMessage()).isEmpty()); } @@ -93,7 +94,7 @@ public final class SCPDownloadClient { return Integer.parseInt(cmd.substring(1), 8); } - private boolean process(String bufferedTMsg, String msg, LocalDestFile f) + private boolean process(TransferListener listener, String bufferedTMsg, String msg, LocalDestFile f) throws IOException { if (msg.length() < 1) throw new SCPException("Could not parse message `" + msg + "`"); @@ -102,15 +103,15 @@ public final class SCPDownloadClient { case 'T': engine.signal("ACK: T"); - process(msg, engine.readMessage(), f); + process(listener, msg, engine.readMessage(), f); break; case 'C': - processFile(msg, bufferedTMsg, f); + processFile(listener, msg, bufferedTMsg, f); break; case 'D': - processDirectory(msg, bufferedTMsg, f); + processDirectory(listener, msg, bufferedTMsg, f); break; case 'E': @@ -129,37 +130,36 @@ public final class SCPDownloadClient { return false; } - private void processDirectory(String dMsg, String tMsg, LocalDestFile f) + private void processDirectory(TransferListener listener, String dMsg, String tMsg, LocalDestFile f) throws IOException { final List dMsgParts = tokenize(dMsg, 3, true); // D 0 final long length = parseLong(dMsgParts.get(1), "dir length"); final String dirname = dMsgParts.get(2); if (length != 0) throw new IOException("Remote SCP command sent strange directory length: " + length); - engine.startedDir(dirname); + + final TransferListener dirListener = listener.directory(dirname); { f = f.getTargetDirectory(dirname); engine.signal("ACK: D"); do { - } while (!process(null, engine.readMessage(), f)); + } while (!process(dirListener, null, engine.readMessage(), f)); setAttributes(f, parsePermissions(dMsgParts.get(0)), tMsg); engine.signal("ACK: E"); } - engine.finishedDir(); } - private void processFile(String cMsg, String tMsg, LocalDestFile f) + private void processFile(TransferListener listener, String cMsg, String tMsg, LocalDestFile f) throws IOException { final List cMsgParts = tokenize(cMsg, 3, true); // C final long length = parseLong(cMsgParts.get(1), "length"); final String filename = cMsgParts.get(2); - engine.startedFile(filename, length); { f = f.getTargetFile(filename); engine.signal("Remote can start transfer"); final OutputStream dest = f.getOutputStream(); try { - engine.transferFromRemote(dest, length); + engine.transferFromRemote(listener.file(filename, length), dest, length); } finally { IOUtils.closeQuietly(dest); } @@ -167,7 +167,6 @@ public final class SCPDownloadClient { setAttributes(f, parsePermissions(cMsgParts.get(0)), tMsg); engine.signal("Transfer done"); } - engine.finishedFile(); } private void setAttributes(LocalDestFile f, int perms, String tMsg) diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java index b7fc94e0..f1b4d900 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPEngine.java @@ -161,39 +161,26 @@ class SCPEngine { scp.getOutputStream().flush(); } - long transferToRemote(InputStream src, long length) + long transferToRemote(StreamCopier.Listener listener, InputStream src, long length) throws IOException { - return transfer(src, scp.getOutputStream(), scp.getRemoteMaxPacketSize(), length); - } - - long transferFromRemote(OutputStream dest, long length) - throws IOException { - return transfer(scp.getInputStream(), dest, scp.getLocalMaxPacketSize(), length); - } - - private long transfer(InputStream in, OutputStream out, int bufSize, long len) - throws IOException { - return new StreamCopier(in, out) - .bufSize(bufSize).length(len) + return new StreamCopier(src, scp.getOutputStream()) + .bufSize(scp.getRemoteMaxPacketSize()).length(length) .keepFlushing(false) .listener(listener) .copy(); } - void startedDir(String dirname) { - listener.startedDir(dirname); + long transferFromRemote(StreamCopier.Listener listener, OutputStream dest, long length) + throws IOException { + return new StreamCopier(scp.getInputStream(), dest) + .bufSize(scp.getLocalMaxPacketSize()).length(length) + .keepFlushing(false) + .listener(listener) + .copy(); } - void finishedDir() { - listener.finishedDir(); - } - - void startedFile(String filename, long length) { - listener.startedFile(filename, length); - } - - void finishedFile() { - listener.finishedFile(); + TransferListener getTransferListener() { + return listener; } } diff --git a/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java b/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java index 9a5321da..4c756343 100644 --- a/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java +++ b/src/main/java/net/schmizz/sshj/xfer/scp/SCPUploadClient.java @@ -16,8 +16,10 @@ package net.schmizz.sshj.xfer.scp; import net.schmizz.sshj.common.IOUtils; +import net.schmizz.sshj.common.StreamCopier; import net.schmizz.sshj.xfer.LocalFileFilter; import net.schmizz.sshj.xfer.LocalSourceFile; +import net.schmizz.sshj.xfer.TransferListener; import net.schmizz.sshj.xfer.scp.SCPEngine.Arg; import java.io.IOException; @@ -60,39 +62,35 @@ public final class SCPUploadClient { args.add(Arg.PRESERVE_TIMES); engine.execSCPWith(args, targetPath); engine.check("Start status OK"); - process(sourceFile); + process(engine.getTransferListener(), sourceFile); } - private void process(LocalSourceFile f) + private void process(TransferListener listener, LocalSourceFile f) throws IOException { if (f.isDirectory()) { - engine.startedDir(f.getName()); - sendDirectory(f); - engine.finishedDir(); + sendDirectory(listener.directory(f.getName()), f); } else if (f.isFile()) { - engine.startedFile(f.getName(), f.getLength()); - sendFile(f); - engine.finishedFile(); + sendFile(listener.file(f.getName(), f.getLength()), f); } else throw new IOException(f + " is not a regular file or directory"); } - private void sendDirectory(LocalSourceFile f) + private void sendDirectory(TransferListener listener, LocalSourceFile f) throws IOException { preserveTimeIfPossible(f); engine.sendMessage("D0" + getPermString(f) + " 0 " + f.getName()); for (LocalSourceFile child : f.getChildren(uploadFilter)) - process(child); + process(listener, child); engine.sendMessage("E"); } - private void sendFile(LocalSourceFile f) + private void sendFile(StreamCopier.Listener listener, LocalSourceFile f) throws IOException { preserveTimeIfPossible(f); final InputStream src = f.getInputStream(); try { engine.sendMessage("C0" + getPermString(f) + " " + f.getLength() + " " + f.getName()); - engine.transferToRemote(src, f.getLength()); + engine.transferToRemote(listener, src, f.getLength()); engine.signal("Transfer done"); engine.check("Remote agrees transfer done"); } finally { diff --git a/src/test/java/net/schmizz/sshj/xfer/scp/SCPUploadClientTest.java b/src/test/java/net/schmizz/sshj/xfer/scp/SCPUploadClientTest.java deleted file mode 100644 index eb1b11b6..00000000 --- a/src/test/java/net/schmizz/sshj/xfer/scp/SCPUploadClientTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package net.schmizz.sshj.xfer.scp; - -import net.schmizz.sshj.xfer.FileSystemFile; -import net.schmizz.sshj.xfer.LocalFileFilter; -import net.schmizz.sshj.xfer.LocalSourceFile; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.mockito.verification.VerificationMode; - -import java.io.File; -import java.io.IOException; - -import static org.mockito.Matchers.endsWith; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class SCPUploadClientTest { - - private SCPEngine engine; - private SCPUploadClient scpUploadClient; - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @Before - public void init() { - engine = mock(SCPEngine.class); - scpUploadClient = new SCPUploadClient(engine); - } - - @Test - public void shouldOnlySendFilterAcceptedFilesFromDirectory() throws IOException { - scpUploadClient.setUploadFilter(new LocalFileFilter() { - @Override - public boolean accept(LocalSourceFile file) { - return !file.getName().contains("not-"); - } - }); - - File dir = temp.newFolder("filtered-scp-upload"); - new File(dir, "not-sent.txt").createNewFile(); - new File(dir, "sent.txt").createNewFile(); - - int copy = scpUploadClient.copy(new FileSystemFile(dir), "/tmp"); - verify(engine).startedDir("filtered-scp-upload"); - verify(engine).startedFile(eq("sent.txt"), isA(Long.class)); - verify(engine, times(1)).startedFile(isA(String.class), isA(Long.class)); - } -}