* further fixed up getting & setting of file attributes

* added a TransferListener for monitoring file transfer
This commit is contained in:
Shikhar Bhushan
2010-05-23 23:51:06 +01:00
parent e765536fa1
commit 5c3ba2282c
11 changed files with 201 additions and 96 deletions

View File

@@ -41,10 +41,15 @@ public class StreamCopier
}; };
} }
public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing) public interface Listener {
void reportProgress(long transferred);
}
public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing, Listener listener)
throws IOException { throws IOException {
long count = 0; long count = 0;
final boolean reportProgress = listener != null;
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final byte[] buf = new byte[bufSize]; final byte[] buf = new byte[bufSize];
@@ -54,6 +59,8 @@ public class StreamCopier
count += read; count += read;
if (keepFlushing) if (keepFlushing)
out.flush(); out.flush();
if (reportProgress)
listener.reportProgress(count);
} }
if (!keepFlushing) if (!keepFlushing)
out.flush(); out.flush();
@@ -65,6 +72,11 @@ public class StreamCopier
return count; return count;
} }
public static long copy(InputStream in, OutputStream out, int bufSize, boolean keepFlushing)
throws IOException {
return copy(in, out, bufSize, keepFlushing, null);
}
public static String copyStreamToString(InputStream stream) public static String copyStreamToString(InputStream stream)
throws IOException { throws IOException {
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();

View File

@@ -20,6 +20,7 @@ import net.schmizz.sshj.sftp.Response.StatusCode;
import net.schmizz.sshj.xfer.AbstractFileTransfer; import net.schmizz.sshj.xfer.AbstractFileTransfer;
import net.schmizz.sshj.xfer.FileTransfer; import net.schmizz.sshj.xfer.FileTransfer;
import net.schmizz.sshj.xfer.FileTransferUtil; import net.schmizz.sshj.xfer.FileTransferUtil;
import net.schmizz.sshj.xfer.TransferListener;
import java.io.File; import java.io.File;
import java.io.FileFilter; import java.io.FileFilter;
@@ -89,27 +90,35 @@ public class SFTPFileTransfer
private class Downloader { private class Downloader {
private final TransferListener listener = getTransferListener();
private void download(final RemoteResourceInfo remote, final File local) private void download(final RemoteResourceInfo remote, final File local)
throws IOException { throws IOException {
log.info("Downloading [{}] to [{}]", remote, local); final File adjustedFile;
switch (remote.getAttributes().getType()) { switch (remote.getAttributes().getType()) {
case DIRECTORY: case DIRECTORY:
downloadDir(remote, local); listener.startedDir(remote.getName());
adjustedFile = downloadDir(remote, local);
listener.finishedDir();
break; break;
case UNKNOWN: case UNKNOWN:
log.warn("Server did not supply information about the type of file at `{}` -- assuming it is a regular file!"); log.warn("Server did not supply information about the type of file at `{}` " +
"-- assuming it is a regular file!", remote.getPath());
case REGULAR: case REGULAR:
downloadFile(remote, local); listener.startedFile(remote.getName(), remote.getAttributes().getSize());
adjustedFile = downloadFile(remote, local);
listener.finishedFile();
break; break;
default: default:
throw new IOException(remote + " is not a regular file or directory"); throw new IOException(remote + " is not a regular file or directory");
} }
copyAttributes(remote, adjustedFile);
} }
private void downloadDir(final RemoteResourceInfo remote, final File local) private File downloadDir(final RemoteResourceInfo remote, final File local)
throws IOException { throws IOException {
final File adjusted = FileTransferUtil.getTargetDirectory(local, remote.getName()); final File adjusted = FileTransferUtil.getTargetDirectory(local, remote.getName());
copyAttributes(remote, adjusted);
final RemoteDirectory rd = sftp.openDir(remote.getPath()); final RemoteDirectory rd = sftp.openDir(remote.getPath());
try { try {
for (RemoteResourceInfo rri : rd.scan(getDownloadFilter())) for (RemoteResourceInfo rri : rd.scan(getDownloadFilter()))
@@ -117,24 +126,25 @@ public class SFTPFileTransfer
} finally { } finally {
rd.close(); rd.close();
} }
return adjusted;
} }
private void downloadFile(final RemoteResourceInfo remote, final File local) private File downloadFile(final RemoteResourceInfo remote, final File local)
throws IOException { throws IOException {
final File adjusted = FileTransferUtil.getTargetFile(local, remote.getName()); final File adjusted = FileTransferUtil.getTargetFile(local, remote.getName());
copyAttributes(remote, adjusted);
final RemoteFile rf = sftp.open(remote.getPath()); final RemoteFile rf = sftp.open(remote.getPath());
try { try {
final FileOutputStream fos = new FileOutputStream(adjusted); final FileOutputStream fos = new FileOutputStream(adjusted);
try { try {
StreamCopier.copy(rf.getInputStream(), fos, sftp.getSubsystem() StreamCopier.copy(rf.getInputStream(), fos, sftp.getSubsystem()
.getLocalMaxPacketSize(), false); .getLocalMaxPacketSize(), false, listener);
} finally { } finally {
fos.close(); fos.close();
} }
} finally { } finally {
rf.close(); rf.close();
} }
return adjusted;
} }
private void copyAttributes(final RemoteResourceInfo remote, final File local) private void copyAttributes(final RemoteResourceInfo remote, final File local)
@@ -151,15 +161,20 @@ public class SFTPFileTransfer
private class Uploader { private class Uploader {
private final TransferListener listener = getTransferListener();
private void upload(File local, String remote) private void upload(File local, String remote)
throws IOException { throws IOException {
log.info("Uploading [{}] to [{}]", local, remote);
final String adjustedPath; final String adjustedPath;
if (local.isDirectory()) if (local.isDirectory()) {
listener.startedDir(local.getName());
adjustedPath = uploadDir(local, remote); adjustedPath = uploadDir(local, remote);
else if (local.isFile()) listener.finishedDir();
} else if (local.isFile()) {
listener.startedFile(local.getName(), local.length());
adjustedPath = uploadFile(local, remote); adjustedPath = uploadFile(local, remote);
else listener.finishedFile();
} else
throw new IOException(local + " is not a file or directory"); throw new IOException(local + " is not a file or directory");
sftp.setAttributes(adjustedPath, getAttributes(local)); sftp.setAttributes(adjustedPath, getAttributes(local));
} }
@@ -181,8 +196,8 @@ public class SFTPFileTransfer
try { try {
final FileInputStream fis = new FileInputStream(local); final FileInputStream fis = new FileInputStream(local);
try { try {
StreamCopier.copy(fis, rf.getOutputStream(), sftp.getSubsystem().getRemoteMaxPacketSize() final int bufSize = sftp.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead();
- rf.getOutgoingPacketOverhead(), false); StreamCopier.copy(fis, rf.getOutputStream(), bufSize, false, listener);
} finally { } finally {
fis.close(); fis.close();
} }

View File

@@ -20,19 +20,18 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractFileTransfer { public abstract class AbstractFileTransfer {
/** Logger */
protected final Logger log = LoggerFactory.getLogger(getClass()); protected final Logger log = LoggerFactory.getLogger(getClass());
public static final ModeGetter defaultModeGetter = new DefaultModeGetter(); public static final ModeGetter DEFAULT_MODE_SETTER = new DefaultModeGetter();
public static final ModeSetter defaultModeSetter = new DefaultModeSetter(); public static final ModeSetter DEFAULT_MODE_GETTER = new DefaultModeSetter();
public static final LoggingTransferListener LOGGING_TRANSFER_LISTENER = new LoggingTransferListener();
private volatile ModeGetter modeGetter = defaultModeGetter; private volatile ModeGetter modeGetter = DEFAULT_MODE_SETTER;
private volatile ModeSetter modeSetter = defaultModeSetter; private volatile ModeSetter modeSetter = DEFAULT_MODE_GETTER;
private volatile TransferListener transferListener = LOGGING_TRANSFER_LISTENER;
private volatile ProgressListener progressListener;
public void setModeGetter(ModeGetter modeGetter) { public void setModeGetter(ModeGetter modeGetter) {
this.modeGetter = (modeGetter == null) ? defaultModeGetter : modeGetter; this.modeGetter = (modeGetter == null) ? DEFAULT_MODE_SETTER : modeGetter;
} }
public ModeGetter getModeGetter() { public ModeGetter getModeGetter() {
@@ -40,19 +39,19 @@ public abstract class AbstractFileTransfer {
} }
public void setModeSetter(ModeSetter modeSetter) { public void setModeSetter(ModeSetter modeSetter) {
this.modeSetter = (modeSetter == null) ? defaultModeSetter : modeSetter; this.modeSetter = (modeSetter == null) ? DEFAULT_MODE_GETTER : modeSetter;
} }
public ModeSetter getModeSetter() { public ModeSetter getModeSetter() {
return this.modeSetter; return this.modeSetter;
} }
public ProgressListener getProgressListener() { public TransferListener getTransferListener() {
return progressListener; return transferListener;
} }
public void setProgressListener(ProgressListener progressListener) { public void setTransferListener(TransferListener transferListener) {
this.progressListener = progressListener; this.transferListener = (transferListener == null) ? LOGGING_TRANSFER_LISTENER : transferListener;
} }
} }

View File

@@ -33,8 +33,8 @@ public interface FileTransfer {
void setModeSetter(ModeSetter modeSetter); void setModeSetter(ModeSetter modeSetter);
ProgressListener getProgressListener(); TransferListener getTransferListener();
void setProgressListener(ProgressListener listener); void setTransferListener(TransferListener listener);
} }

View File

@@ -0,0 +1,67 @@
package net.schmizz.sshj.xfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
public class LoggingTransferListener
implements TransferListener {
private final Logger log = LoggerFactory.getLogger(getClass());
private final List<String> dirNames = new LinkedList<String>();
private String base = "";
private String name = "";
private long size = -1;
public void startedDir(String name) {
dirNames.add(name);
size = -1;
fixBase();
log.info("started transferring directory `{}`", currentNode());
}
@Override
public void startedFile(String name, long size) {
this.name = name;
this.size = size;
log.info("started transferring file `{}` ({} bytes)", currentNode(), size);
}
@Override
public void reportProgress(long transferred) {
if (log.isDebugEnabled()) {
log.debug("transferred {}% of `{}`", ((transferred * 100) / size), currentNode());
}
}
@Override
public void finishedFile() {
log.info("finished transferring file `{}`", currentNode());
name = "";
size = -1;
}
@Override
public void finishedDir() {
log.info("finished transferring dir `{}`", currentNode());
size = -1;
dirNames.remove(dirNames.size() - 1);
fixBase();
}
private void fixBase() {
final StringBuilder qualifier = new StringBuilder();
for (String parent : dirNames) {
qualifier.append(parent).append("/");
}
base = qualifier.toString();
}
private String currentNode() {
return base + name;
}
}

View File

@@ -1,11 +0,0 @@
package net.schmizz.sshj.xfer;
public interface ProgressListener {
void started(int item, boolean isDir);
void progressed(long done, long total);
void completed(int item);
}

View File

@@ -0,0 +1,16 @@
package net.schmizz.sshj.xfer;
import net.schmizz.sshj.common.StreamCopier;
public interface TransferListener
extends StreamCopier.Listener {
void startedDir(String name);
void startedFile(String name, long size);
void finishedFile();
void finishedDir();
}

View File

@@ -20,6 +20,7 @@ import net.schmizz.sshj.common.SSHException;
import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.connection.channel.direct.SessionFactory;
import net.schmizz.sshj.xfer.FileTransferUtil; import net.schmizz.sshj.xfer.FileTransferUtil;
import net.schmizz.sshj.xfer.ModeSetter; import net.schmizz.sshj.xfer.ModeSetter;
import net.schmizz.sshj.xfer.TransferListener;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@@ -35,8 +36,8 @@ public final class SCPDownloadClient
private boolean recursive = true; private boolean recursive = true;
SCPDownloadClient(SessionFactory host, ModeSetter modeSetter) { SCPDownloadClient(SessionFactory host, TransferListener listener, ModeSetter modeSetter) {
super(host); super(host, listener);
this.modeSetter = modeSetter; this.modeSetter = modeSetter;
} }
@@ -98,17 +99,6 @@ public final class SCPDownloadClient
return Integer.parseInt(cmd.substring(1), 8); return Integer.parseInt(cmd.substring(1), 8);
} }
private void prepare(File f, int perms, String tMsg)
throws IOException {
modeSetter.setPermissions(f, perms);
if (tMsg != null && modeSetter.preservesTimes()) {
String[] tMsgParts = tokenize(tMsg, 4); // e.g. T<mtime> 0 <atime> 0
modeSetter.setLastModifiedTime(f, parseLong(tMsgParts[0].substring(1), "last modified time"));
modeSetter.setLastAccessedTime(f, parseLong(tMsgParts[2], "last access time"));
}
}
private boolean process(String bufferedTMsg, String msg, File f) private boolean process(String bufferedTMsg, String msg, File f)
throws IOException { throws IOException {
if (msg.length() < 1) if (msg.length() < 1)
@@ -150,41 +140,53 @@ public final class SCPDownloadClient
private void processDirectory(String dMsg, String tMsg, File f) private void processDirectory(String dMsg, String tMsg, File f)
throws IOException { throws IOException {
String[] dMsgParts = tokenize(dMsg, 3); // e.g. D0755 0 <dirname> final String[] dMsgParts = tokenize(dMsg, 3); // D<perms> 0 <dirname>
final long length = parseLong(dMsgParts[1], "dir length");
long length = parseLong(dMsgParts[1], "dir length"); final String dirname = dMsgParts[2];
if (length != 0) if (length != 0)
throw new IOException("Remote SCP command sent strange directory length: " + length); throw new IOException("Remote SCP command sent strange directory length: " + length);
listener.startedDir(dirname);
f = FileTransferUtil.getTargetDirectory(f, dMsgParts[2]); {
prepare(f, parsePermissions(dMsgParts[0]), tMsg); f = FileTransferUtil.getTargetDirectory(f, dirname);
signal("ACK: D");
signal("ACK: D"); do {
} while (!process(null, readMessage(), f));
do { setAttributes(f, parsePermissions(dMsgParts[0]), tMsg);
} while (!process(null, readMessage(), f)); signal("ACK: E");
}
signal("ACK: E"); listener.finishedDir();
} }
private void processFile(String cMsg, String tMsg, File f) private void processFile(String cMsg, String tMsg, File f)
throws IOException { throws IOException {
String[] cMsgParts = tokenize(cMsg, 3); final String[] cMsgParts = tokenize(cMsg, 3); // C<perms> <size> <filename>
final long length = parseLong(cMsgParts[1], "length");
long length = parseLong(cMsgParts[1], "length"); final String filename = cMsgParts[2];
listener.startedFile(filename, length);
f = FileTransferUtil.getTargetFile(f, cMsgParts[2]); {
prepare(f, parsePermissions(cMsgParts[0]), tMsg); f = FileTransferUtil.getTargetFile(f, filename);
signal("Remote can start transfer");
signal("Remote can start transfer"); final FileOutputStream fos = new FileOutputStream(f);
final FileOutputStream fos = new FileOutputStream(f); try {
try { transfer(scp.getInputStream(), fos, scp.getLocalMaxPacketSize(), length);
transfer(scp.getInputStream(), fos, scp.getLocalMaxPacketSize(), length); } finally {
} finally { IOUtils.closeQuietly(fos);
IOUtils.closeQuietly(fos); }
check("Remote agrees transfer done");
setAttributes(f, parsePermissions(cMsgParts[0]), tMsg);
signal("Transfer done");
}
listener.finishedFile();
}
private void setAttributes(File f, int perms, String tMsg)
throws IOException {
modeSetter.setPermissions(f, perms);
if (tMsg != null && modeSetter.preservesTimes()) {
String[] tMsgParts = tokenize(tMsg, 4); // e.g. T<mtime> 0 <atime> 0
modeSetter.setLastModifiedTime(f, parseLong(tMsgParts[0].substring(1), "last modified time"));
modeSetter.setLastAccessedTime(f, parseLong(tMsgParts[2], "last access time"));
} }
check("Remote agrees transfer done");
signal("Transfer done");
} }
private String[] tokenize(String msg, int numPartsExpected) private String[] tokenize(String msg, int numPartsExpected)

View File

@@ -19,6 +19,7 @@ import net.schmizz.sshj.common.IOUtils;
import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.common.SSHException;
import net.schmizz.sshj.connection.channel.direct.Session.Command; import net.schmizz.sshj.connection.channel.direct.Session.Command;
import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.connection.channel.direct.SessionFactory;
import net.schmizz.sshj.xfer.TransferListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -59,13 +60,15 @@ abstract class SCPEngine {
final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());
final SessionFactory host; final SessionFactory host;
final TransferListener listener;
final Queue<String> warnings = new LinkedList<String>(); final Queue<String> warnings = new LinkedList<String>();
Command scp; Command scp;
int exitStatus; int exitStatus;
SCPEngine(SessionFactory host) { SCPEngine(SessionFactory host, TransferListener listener) {
this.host = host; this.host = host;
this.listener = listener;
} }
public int copy(String sourcePath, String targetPath) public int copy(String sourcePath, String targetPath)
@@ -201,6 +204,7 @@ abstract class SCPEngine {
while (count < len && (read = in.read(buf, 0, (int) Math.min(bufSize, len - count))) != -1) { while (count < len && (read = in.read(buf, 0, (int) Math.min(bufSize, len - count))) != -1) {
out.write(buf, 0, read); out.write(buf, 0, read);
count += read; count += read;
listener.reportProgress(count);
} }
out.flush(); out.flush();

View File

@@ -24,6 +24,7 @@ import java.io.IOException;
public class SCPFileTransfer public class SCPFileTransfer
extends AbstractFileTransfer extends AbstractFileTransfer
implements FileTransfer { implements FileTransfer {
private final SessionFactory sessionFactory; private final SessionFactory sessionFactory;
public SCPFileTransfer(SessionFactory sessionFactory) { public SCPFileTransfer(SessionFactory sessionFactory) {
@@ -31,11 +32,11 @@ public class SCPFileTransfer
} }
public SCPDownloadClient newSCPDownloadClient() { public SCPDownloadClient newSCPDownloadClient() {
return new SCPDownloadClient(sessionFactory, getModeSetter()); return new SCPDownloadClient(sessionFactory, getTransferListener(), getModeSetter());
} }
public SCPUploadClient newSCPUploadClient() { public SCPUploadClient newSCPUploadClient() {
return new SCPUploadClient(sessionFactory, getModeGetter()); return new SCPUploadClient(sessionFactory, getTransferListener(), getModeGetter());
} }
@Override @Override

View File

@@ -19,6 +19,7 @@ import net.schmizz.sshj.common.IOUtils;
import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.common.SSHException;
import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.connection.channel.direct.SessionFactory;
import net.schmizz.sshj.xfer.ModeGetter; import net.schmizz.sshj.xfer.ModeGetter;
import net.schmizz.sshj.xfer.TransferListener;
import java.io.File; import java.io.File;
import java.io.FileFilter; import java.io.FileFilter;
@@ -36,8 +37,8 @@ public final class SCPUploadClient
private FileFilter fileFilter; private FileFilter fileFilter;
SCPUploadClient(SessionFactory host, ModeGetter modeGetter) { SCPUploadClient(SessionFactory host, TransferListener listener, ModeGetter modeGetter) {
super(host); super(host, listener);
this.modeGetter = modeGetter; this.modeGetter = modeGetter;
} }
@@ -80,30 +81,29 @@ public final class SCPUploadClient
private void process(File f) private void process(File f)
throws IOException { throws IOException {
if (f.isDirectory()) if (f.isDirectory()) {
listener.startedDir(f.getName());
sendDirectory(f); sendDirectory(f);
else if (f.isFile()) listener.finishedDir();
} else if (f.isFile()) {
listener.startedFile(f.getName(), f.length());
sendFile(f); sendFile(f);
else listener.finishedFile();
} else
throw new IOException(f + " is not a regular file or directory"); throw new IOException(f + " is not a regular file or directory");
} }
private void sendDirectory(File f) private void sendDirectory(File f)
throws IOException { throws IOException {
log.info("Entering directory `{}`", f.getName());
preserveTimeIfPossible(f); preserveTimeIfPossible(f);
sendMessage("D0" + getPermString(f) + " 0 " + f.getName()); sendMessage("D0" + getPermString(f) + " 0 " + f.getName());
for (File child : getChildren(f)) for (File child : getChildren(f))
process(child); process(child);
sendMessage("E"); sendMessage("E");
log.info("Exiting directory `{}`", f.getName());
} }
private void sendFile(File f) private void sendFile(File f)
throws IOException { throws IOException {
log.info("Sending `{}`...", f.getName());
preserveTimeIfPossible(f); preserveTimeIfPossible(f);
final InputStream src = new FileInputStream(f); final InputStream src = new FileInputStream(f);
try { try {