Refactor TransferListener interface to support thread-safe, immutable implementation.

Fix #56
This commit is contained in:
Shikhar Bhushan
2012-01-29 22:45:58 +00:00
parent d70d37cf4e
commit bc59c81dbc
7 changed files with 92 additions and 190 deletions

View File

@@ -46,7 +46,7 @@ public class SFTPFileTransfer
@Override @Override
public void upload(String source, String dest) public void upload(String source, String dest)
throws IOException { throws IOException {
new Uploader().upload(new FileSystemFile(source), dest); upload(new FileSystemFile(source), dest);
} }
@Override @Override
@@ -58,7 +58,7 @@ public class SFTPFileTransfer
@Override @Override
public void upload(LocalSourceFile localFile, String remotePath) public void upload(LocalSourceFile localFile, String remotePath)
throws IOException { throws IOException {
new Uploader().upload(localFile, remotePath); new Uploader().upload(getTransferListener(), localFile, remotePath);
} }
@Override @Override
@@ -66,7 +66,7 @@ public class SFTPFileTransfer
throws IOException { throws IOException {
final PathComponents pathComponents = engine.getPathHelper().getComponents(source); final PathComponents pathComponents = engine.getPathHelper().getComponents(source);
final FileAttributes attributes = engine.stat(source); final FileAttributes attributes = engine.stat(source);
new Downloader().download(new RemoteResourceInfo(pathComponents, attributes), dest); new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest);
} }
public void setUploadFilter(LocalFileFilter uploadFilter) { public void setUploadFilter(LocalFileFilter uploadFilter) {
@@ -87,24 +87,21 @@ public class SFTPFileTransfer
private class Downloader { private class Downloader {
private final TransferListener listener = getTransferListener(); private void download(final TransferListener listener,
final RemoteResourceInfo remote,
private void download(final RemoteResourceInfo remote, final LocalDestFile local) final LocalDestFile local)
throws IOException { throws IOException {
final LocalDestFile adjustedFile; final LocalDestFile adjustedFile;
switch (remote.getAttributes().getType()) { switch (remote.getAttributes().getType()) {
case DIRECTORY: case DIRECTORY:
listener.startedDir(remote.getName()); adjustedFile = downloadDir(listener.directory(remote.getName()), remote, local);
adjustedFile = downloadDir(remote, local);
listener.finishedDir();
break; break;
case UNKNOWN: case UNKNOWN:
log.warn("Server did not supply information about the type of file at `{}` " + log.warn("Server did not supply information about the type of file at `{}` " +
"-- assuming it is a regular file!", remote.getPath()); "-- assuming it is a regular file!", remote.getPath());
case REGULAR: case REGULAR:
listener.startedFile(remote.getName(), remote.getAttributes().getSize()); adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()),
adjustedFile = downloadFile(remote, local); remote, local);
listener.finishedFile();
break; break;
default: default:
throw new IOException(remote + " is not a regular file or directory"); throw new IOException(remote + " is not a regular file or directory");
@@ -113,20 +110,24 @@ public class SFTPFileTransfer
} }
private LocalDestFile downloadDir(final RemoteResourceInfo remote, final LocalDestFile local) private LocalDestFile downloadDir(final TransferListener listener,
final RemoteResourceInfo remote,
final LocalDestFile local)
throws IOException { throws IOException {
final LocalDestFile adjusted = local.getTargetDirectory(remote.getName()); final LocalDestFile adjusted = local.getTargetDirectory(remote.getName());
final RemoteDirectory rd = engine.openDir(remote.getPath()); final RemoteDirectory rd = engine.openDir(remote.getPath());
try { try {
for (RemoteResourceInfo rri : rd.scan(getDownloadFilter())) for (RemoteResourceInfo rri : rd.scan(getDownloadFilter()))
download(rri, adjusted.getChild(rri.getName())); download(listener, rri, adjusted.getChild(rri.getName()));
} finally { } finally {
rd.close(); rd.close();
} }
return adjusted; return adjusted;
} }
private LocalDestFile downloadFile(final RemoteResourceInfo remote, final LocalDestFile local) private LocalDestFile downloadFile(final StreamCopier.Listener listener,
final RemoteResourceInfo remote,
final LocalDestFile local)
throws IOException { throws IOException {
final LocalDestFile adjusted = local.getTargetFile(remote.getName()); final LocalDestFile adjusted = local.getTargetFile(remote.getName());
final RemoteFile rf = engine.open(remote.getPath()); final RemoteFile rf = engine.open(remote.getPath());
@@ -161,33 +162,33 @@ public class SFTPFileTransfer
private class Uploader { private class Uploader {
private final TransferListener listener = getTransferListener(); private void upload(final TransferListener listener,
final LocalSourceFile local,
private void upload(LocalSourceFile local, String remote) final String remote)
throws IOException { throws IOException {
final String adjustedPath; final String adjustedPath;
if (local.isDirectory()) { if (local.isDirectory()) {
listener.startedDir(local.getName()); adjustedPath = uploadDir(listener.directory(local.getName()), local, remote);
adjustedPath = uploadDir(local, remote);
listener.finishedDir();
} else if (local.isFile()) { } else if (local.isFile()) {
listener.startedFile(local.getName(), local.getLength()); adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote);
adjustedPath = uploadFile(local, remote);
listener.finishedFile();
} else } else
throw new IOException(local + " is not a file or directory"); throw new IOException(local + " is not a file or directory");
engine.setAttributes(adjustedPath, getAttributes(local)); engine.setAttributes(adjustedPath, getAttributes(local));
} }
private String uploadDir(LocalSourceFile local, String remote) private String uploadDir(final TransferListener listener,
final LocalSourceFile local,
final String remote)
throws IOException { throws IOException {
final String adjusted = prepareDir(local, remote); final String adjusted = prepareDir(local, remote);
for (LocalSourceFile f : local.getChildren(getUploadFilter())) for (LocalSourceFile f : local.getChildren(getUploadFilter()))
upload(f, adjusted); upload(listener, f, adjusted);
return adjusted; return adjusted;
} }
private String uploadFile(LocalSourceFile local, String remote) private String uploadFile(final StreamCopier.Listener listener,
final LocalSourceFile local,
final String remote)
throws IOException { throws IOException {
final String adjusted = prepareFile(local, remote); final String adjusted = prepareFile(local, remote);
final RemoteFile rf = engine.open(adjusted, EnumSet.of(OpenMode.WRITE, final RemoteFile rf = engine.open(adjusted, EnumSet.of(OpenMode.WRITE,
@@ -210,7 +211,7 @@ public class SFTPFileTransfer
return adjusted; return adjusted;
} }
private String prepareDir(LocalSourceFile local, String remote) private String prepareDir(final LocalSourceFile local, final String remote)
throws IOException { throws IOException {
final FileAttributes attrs; final FileAttributes attrs;
try { try {
@@ -236,7 +237,7 @@ public class SFTPFileTransfer
throw new IOException(attrs.getMode().getType() + " file already exists at " + remote); throw new IOException(attrs.getMode().getType() + " file already exists at " + remote);
} }
private String prepareFile(LocalSourceFile local, String remote) private String prepareFile(final LocalSourceFile local, final String remote)
throws IOException { throws IOException {
final FileAttributes attrs; final FileAttributes attrs;
try { try {
@@ -250,8 +251,7 @@ public class SFTPFileTransfer
} }
if (attrs.getMode().getType() == FileMode.Type.DIRECTORY) { if (attrs.getMode().getType() == FileMode.Type.DIRECTORY) {
log.debug("probeFile: {} was directory, path adjusted for {}", remote, local.getName()); log.debug("probeFile: {} was directory, path adjusted for {}", remote, local.getName());
remote = engine.getPathHelper().adjustForParent(remote, local.getName()); return engine.getPathHelper().adjustForParent(remote, local.getName());
return remote;
} else { } else {
log.debug("probeFile: {} is a {} file that will be replaced", remote, attrs.getMode().getType()); log.debug("probeFile: {} is a {} file that will be replaced", remote, attrs.getMode().getType());
return remote; return remote;

View File

@@ -1,68 +1,45 @@
package net.schmizz.sshj.xfer; package net.schmizz.sshj.xfer;
import net.schmizz.sshj.common.StreamCopier;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.io.IOException;
import java.util.List;
public class LoggingTransferListener public class LoggingTransferListener
implements TransferListener { implements TransferListener {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private final List<String> dirNames = new ArrayList<String>(); private final String relPath;
private String base = "";
private String name = "";
private long size = -1;
@Override public LoggingTransferListener() {
public void startedDir(String name) { this("");
dirNames.add(name); }
size = -1;
fixBase(); private LoggingTransferListener(String relPath) {
log.info("started transferring directory `{}`", currentNode()); this.relPath = relPath;
} }
@Override @Override
public void startedFile(String name, long size) { public TransferListener directory(String name) {
this.name = name; log.info("started transferring directory `{}`", name);
this.size = size; return new LoggingTransferListener(relPath + name + "/");
log.info("started transferring file `{}` ({} bytes)", currentNode(), size);
} }
@Override @Override
public void reportProgress(long transferred) { public StreamCopier.Listener file(final String name, final long size) {
final String path = relPath + name;
log.info("started transferring file `{}` ({} bytes)", path, size);
return new StreamCopier.Listener() {
@Override
public void reportProgress(long transferred)
throws IOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("transferred {}% of `{}`", ((transferred * 100) / size), currentNode()); log.debug("transferred {}% of `{}`", ((transferred * 100) / size), path);
} }
} }
};
@Override
public void finishedFile() {
log.info("finished transferring file `{}`", currentNode());
name = "";
size = -1;
}
@Override
public void finishedDir() {
log.info("finished transferring dir `{}`", currentNode());
size = -1;
dirNames.remove(dirNames.size() - 1);
fixBase();
}
private void fixBase() {
final StringBuilder qualifier = new StringBuilder();
for (String parent : dirNames) {
qualifier.append(parent).append("/");
}
base = qualifier.toString();
}
private String currentNode() {
return base + name;
} }
} }

View File

@@ -2,15 +2,10 @@ package net.schmizz.sshj.xfer;
import net.schmizz.sshj.common.StreamCopier; import net.schmizz.sshj.common.StreamCopier;
public interface TransferListener public interface TransferListener {
extends StreamCopier.Listener {
void startedDir(String name); TransferListener directory(String name);
void startedFile(String name, long size); StreamCopier.Listener file(String name, long size);
void finishedFile();
void finishedDir();
} }

View File

@@ -17,6 +17,7 @@ package net.schmizz.sshj.xfer.scp;
import net.schmizz.sshj.common.IOUtils; import net.schmizz.sshj.common.IOUtils;
import net.schmizz.sshj.xfer.LocalDestFile; import net.schmizz.sshj.xfer.LocalDestFile;
import net.schmizz.sshj.xfer.TransferListener;
import net.schmizz.sshj.xfer.scp.SCPEngine.Arg; import net.schmizz.sshj.xfer.scp.SCPEngine.Arg;
import java.io.IOException; import java.io.IOException;
@@ -71,7 +72,7 @@ public final class SCPDownloadClient {
String msg = engine.readMessage(); String msg = engine.readMessage();
do do
process(null, msg, targetFile); process(engine.getTransferListener(), null, msg, targetFile);
while (!(msg = engine.readMessage()).isEmpty()); while (!(msg = engine.readMessage()).isEmpty());
} }
@@ -93,7 +94,7 @@ public final class SCPDownloadClient {
return Integer.parseInt(cmd.substring(1), 8); return Integer.parseInt(cmd.substring(1), 8);
} }
private boolean process(String bufferedTMsg, String msg, LocalDestFile f) private boolean process(TransferListener listener, String bufferedTMsg, String msg, LocalDestFile f)
throws IOException { throws IOException {
if (msg.length() < 1) if (msg.length() < 1)
throw new SCPException("Could not parse message `" + msg + "`"); throw new SCPException("Could not parse message `" + msg + "`");
@@ -102,15 +103,15 @@ public final class SCPDownloadClient {
case 'T': case 'T':
engine.signal("ACK: T"); engine.signal("ACK: T");
process(msg, engine.readMessage(), f); process(listener, msg, engine.readMessage(), f);
break; break;
case 'C': case 'C':
processFile(msg, bufferedTMsg, f); processFile(listener, msg, bufferedTMsg, f);
break; break;
case 'D': case 'D':
processDirectory(msg, bufferedTMsg, f); processDirectory(listener, msg, bufferedTMsg, f);
break; break;
case 'E': case 'E':
@@ -129,37 +130,36 @@ public final class SCPDownloadClient {
return false; return false;
} }
private void processDirectory(String dMsg, String tMsg, LocalDestFile f) private void processDirectory(TransferListener listener, String dMsg, String tMsg, LocalDestFile f)
throws IOException { throws IOException {
final List<String> dMsgParts = tokenize(dMsg, 3, true); // D<perms> 0 <dirname> final List<String> dMsgParts = tokenize(dMsg, 3, true); // D<perms> 0 <dirname>
final long length = parseLong(dMsgParts.get(1), "dir length"); final long length = parseLong(dMsgParts.get(1), "dir length");
final String dirname = dMsgParts.get(2); final String dirname = dMsgParts.get(2);
if (length != 0) if (length != 0)
throw new IOException("Remote SCP command sent strange directory length: " + length); throw new IOException("Remote SCP command sent strange directory length: " + length);
engine.startedDir(dirname);
final TransferListener dirListener = listener.directory(dirname);
{ {
f = f.getTargetDirectory(dirname); f = f.getTargetDirectory(dirname);
engine.signal("ACK: D"); engine.signal("ACK: D");
do { do {
} while (!process(null, engine.readMessage(), f)); } while (!process(dirListener, null, engine.readMessage(), f));
setAttributes(f, parsePermissions(dMsgParts.get(0)), tMsg); setAttributes(f, parsePermissions(dMsgParts.get(0)), tMsg);
engine.signal("ACK: E"); engine.signal("ACK: E");
} }
engine.finishedDir();
} }
private void processFile(String cMsg, String tMsg, LocalDestFile f) private void processFile(TransferListener listener, String cMsg, String tMsg, LocalDestFile f)
throws IOException { throws IOException {
final List<String> cMsgParts = tokenize(cMsg, 3, true); // C<perms> <size> <filename> final List<String> cMsgParts = tokenize(cMsg, 3, true); // C<perms> <size> <filename>
final long length = parseLong(cMsgParts.get(1), "length"); final long length = parseLong(cMsgParts.get(1), "length");
final String filename = cMsgParts.get(2); final String filename = cMsgParts.get(2);
engine.startedFile(filename, length);
{ {
f = f.getTargetFile(filename); f = f.getTargetFile(filename);
engine.signal("Remote can start transfer"); engine.signal("Remote can start transfer");
final OutputStream dest = f.getOutputStream(); final OutputStream dest = f.getOutputStream();
try { try {
engine.transferFromRemote(dest, length); engine.transferFromRemote(listener.file(filename, length), dest, length);
} finally { } finally {
IOUtils.closeQuietly(dest); IOUtils.closeQuietly(dest);
} }
@@ -167,7 +167,6 @@ public final class SCPDownloadClient {
setAttributes(f, parsePermissions(cMsgParts.get(0)), tMsg); setAttributes(f, parsePermissions(cMsgParts.get(0)), tMsg);
engine.signal("Transfer done"); engine.signal("Transfer done");
} }
engine.finishedFile();
} }
private void setAttributes(LocalDestFile f, int perms, String tMsg) private void setAttributes(LocalDestFile f, int perms, String tMsg)

View File

@@ -161,39 +161,26 @@ class SCPEngine {
scp.getOutputStream().flush(); scp.getOutputStream().flush();
} }
long transferToRemote(InputStream src, long length) long transferToRemote(StreamCopier.Listener listener, InputStream src, long length)
throws IOException { throws IOException {
return transfer(src, scp.getOutputStream(), scp.getRemoteMaxPacketSize(), length); return new StreamCopier(src, scp.getOutputStream())
} .bufSize(scp.getRemoteMaxPacketSize()).length(length)
long transferFromRemote(OutputStream dest, long length)
throws IOException {
return transfer(scp.getInputStream(), dest, scp.getLocalMaxPacketSize(), length);
}
private long transfer(InputStream in, OutputStream out, int bufSize, long len)
throws IOException {
return new StreamCopier(in, out)
.bufSize(bufSize).length(len)
.keepFlushing(false) .keepFlushing(false)
.listener(listener) .listener(listener)
.copy(); .copy();
} }
void startedDir(String dirname) { long transferFromRemote(StreamCopier.Listener listener, OutputStream dest, long length)
listener.startedDir(dirname); throws IOException {
return new StreamCopier(scp.getInputStream(), dest)
.bufSize(scp.getLocalMaxPacketSize()).length(length)
.keepFlushing(false)
.listener(listener)
.copy();
} }
void finishedDir() { TransferListener getTransferListener() {
listener.finishedDir(); return listener;
}
void startedFile(String filename, long length) {
listener.startedFile(filename, length);
}
void finishedFile() {
listener.finishedFile();
} }
} }

View File

@@ -16,8 +16,10 @@
package net.schmizz.sshj.xfer.scp; package net.schmizz.sshj.xfer.scp;
import net.schmizz.sshj.common.IOUtils; import net.schmizz.sshj.common.IOUtils;
import net.schmizz.sshj.common.StreamCopier;
import net.schmizz.sshj.xfer.LocalFileFilter; import net.schmizz.sshj.xfer.LocalFileFilter;
import net.schmizz.sshj.xfer.LocalSourceFile; import net.schmizz.sshj.xfer.LocalSourceFile;
import net.schmizz.sshj.xfer.TransferListener;
import net.schmizz.sshj.xfer.scp.SCPEngine.Arg; import net.schmizz.sshj.xfer.scp.SCPEngine.Arg;
import java.io.IOException; import java.io.IOException;
@@ -60,39 +62,35 @@ public final class SCPUploadClient {
args.add(Arg.PRESERVE_TIMES); args.add(Arg.PRESERVE_TIMES);
engine.execSCPWith(args, targetPath); engine.execSCPWith(args, targetPath);
engine.check("Start status OK"); engine.check("Start status OK");
process(sourceFile); process(engine.getTransferListener(), sourceFile);
} }
private void process(LocalSourceFile f) private void process(TransferListener listener, LocalSourceFile f)
throws IOException { throws IOException {
if (f.isDirectory()) { if (f.isDirectory()) {
engine.startedDir(f.getName()); sendDirectory(listener.directory(f.getName()), f);
sendDirectory(f);
engine.finishedDir();
} else if (f.isFile()) { } else if (f.isFile()) {
engine.startedFile(f.getName(), f.getLength()); sendFile(listener.file(f.getName(), f.getLength()), f);
sendFile(f);
engine.finishedFile();
} else } else
throw new IOException(f + " is not a regular file or directory"); throw new IOException(f + " is not a regular file or directory");
} }
private void sendDirectory(LocalSourceFile f) private void sendDirectory(TransferListener listener, LocalSourceFile f)
throws IOException { throws IOException {
preserveTimeIfPossible(f); preserveTimeIfPossible(f);
engine.sendMessage("D0" + getPermString(f) + " 0 " + f.getName()); engine.sendMessage("D0" + getPermString(f) + " 0 " + f.getName());
for (LocalSourceFile child : f.getChildren(uploadFilter)) for (LocalSourceFile child : f.getChildren(uploadFilter))
process(child); process(listener, child);
engine.sendMessage("E"); engine.sendMessage("E");
} }
private void sendFile(LocalSourceFile f) private void sendFile(StreamCopier.Listener listener, LocalSourceFile f)
throws IOException { throws IOException {
preserveTimeIfPossible(f); preserveTimeIfPossible(f);
final InputStream src = f.getInputStream(); final InputStream src = f.getInputStream();
try { try {
engine.sendMessage("C0" + getPermString(f) + " " + f.getLength() + " " + f.getName()); engine.sendMessage("C0" + getPermString(f) + " " + f.getLength() + " " + f.getName());
engine.transferToRemote(src, f.getLength()); engine.transferToRemote(listener, src, f.getLength());
engine.signal("Transfer done"); engine.signal("Transfer done");
engine.check("Remote agrees transfer done"); engine.check("Remote agrees transfer done");
} finally { } finally {

View File

@@ -1,54 +0,0 @@
package net.schmizz.sshj.xfer.scp;
import net.schmizz.sshj.xfer.FileSystemFile;
import net.schmizz.sshj.xfer.LocalFileFilter;
import net.schmizz.sshj.xfer.LocalSourceFile;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.verification.VerificationMode;
import java.io.File;
import java.io.IOException;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class SCPUploadClientTest {
private SCPEngine engine;
private SCPUploadClient scpUploadClient;
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Before
public void init() {
engine = mock(SCPEngine.class);
scpUploadClient = new SCPUploadClient(engine);
}
@Test
public void shouldOnlySendFilterAcceptedFilesFromDirectory() throws IOException {
scpUploadClient.setUploadFilter(new LocalFileFilter() {
@Override
public boolean accept(LocalSourceFile file) {
return !file.getName().contains("not-");
}
});
File dir = temp.newFolder("filtered-scp-upload");
new File(dir, "not-sent.txt").createNewFile();
new File(dir, "sent.txt").createNewFile();
int copy = scpUploadClient.copy(new FileSystemFile(dir), "/tmp");
verify(engine).startedDir("filtered-scp-upload");
verify(engine).startedFile(eq("sent.txt"), isA(Long.class));
verify(engine, times(1)).startedFile(isA(String.class), isA(Long.class));
}
}