From f354fd66615ddf70401e8a7191a15ffc4dd11f45 Mon Sep 17 00:00:00 2001 From: David Kocher Date: Fri, 9 May 2014 13:43:52 +0200 Subject: [PATCH] Implement read ahead to speed up transfer rates for downloads by a magnitude. --- .../net/schmizz/sshj/sftp/RemoteFile.java | 96 ++++++++++++++----- 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java index 588fd081..e77157f3 100644 --- a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java +++ b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java @@ -15,9 +15,6 @@ */ package net.schmizz.sshj.sftp; -import net.schmizz.concurrent.Promise; -import net.schmizz.sshj.sftp.Response.StatusCode; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,6 +22,10 @@ import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.TimeUnit; +import net.schmizz.concurrent.Promise; +import net.schmizz.sshj.common.Buffer; +import net.schmizz.sshj.sftp.Response.StatusCode; + public class RemoteFile extends RemoteResource { @@ -52,10 +53,17 @@ public class RemoteFile public int read(long fileOffset, byte[] to, int offset, int len) throws IOException { - final Response res = requester.request( - newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len) - ).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); - switch (res.getType()) { + final Response res = this.asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); + return this.checkReadResponse(res, to, offset); + } + + protected Promise asyncRead(long fileOffset, int len) + throws IOException { + return requester.request(newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len)); + } + + protected int checkReadResponse(Response res, byte[] to, int offset) throws Buffer.BufferException, SFTPException { + switch(res.getType()) { case DATA: int recvLen = res.readUInt32AsInt(); System.arraycopy(res.array(), res.rpos(), to, offset, recvLen); @@ -72,19 +80,19 @@ public class RemoteFile public void write(long fileOffset, byte[] data, int off, int len) throws IOException { - checkResponse(asyncWrite(fileOffset, data, off, len)); + checkWriteResponse(asyncWrite(fileOffset, data, off, len)); } protected Promise asyncWrite(long fileOffset, byte[] data, int off, int len) throws IOException { return requester.request(newRequest(PacketType.WRITE) - .putUInt64(fileOffset) - .putUInt32(len - off) - .putRawBytes(data, off, len) + .putUInt64(fileOffset) + .putUInt32(len - off) + .putRawBytes(data, off, len) ); } - private void checkResponse(Promise responsePromise) + private void checkWriteResponse(Promise responsePromise) throws SFTPException { responsePromise.retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK(); } @@ -139,8 +147,8 @@ public class RemoteFile @Override public void write(byte[] buf, int off, int len) throws IOException { - if (unconfirmedWrites.size() > maxUnconfirmedWrites) { - checkResponse(unconfirmedWrites.remove()); + if(unconfirmedWrites.size() > maxUnconfirmedWrites) { + checkWriteResponse(unconfirmedWrites.remove()); } unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len)); fileOffset += len; @@ -149,8 +157,8 @@ public class RemoteFile @Override public void flush() throws IOException { - while (!unconfirmedWrites.isEmpty()) { - checkResponse(unconfirmedWrites.remove()); + while(!unconfirmedWrites.isEmpty()) { + checkWriteResponse(unconfirmedWrites.remove()); } } @@ -167,6 +175,11 @@ public class RemoteFile private final byte[] b = new byte[1]; + private final int maxUnconfirmedReads; + private final Queue> unconfirmedReads; + + private boolean eof; + private long fileOffset; private long markPos; private long readLimit; @@ -176,7 +189,13 @@ public class RemoteFile } public RemoteFileInputStream(long fileOffset) { + this(fileOffset, 0); + } + + public RemoteFileInputStream(long fileOffset, int maxUnconfirmedReads) { this.fileOffset = fileOffset; + this.maxUnconfirmedReads = maxUnconfirmedReads; + this.unconfirmedReads = new LinkedList>(); } @Override @@ -187,7 +206,7 @@ public class RemoteFile @Override public void mark(int readLimit) { this.readLimit = readLimit; - markPos = fileOffset; + this.markPos = fileOffset; } @Override @@ -211,14 +230,45 @@ public class RemoteFile @Override public int read(byte[] into, int off, int len) throws IOException { - int read = RemoteFile.this.read(fileOffset, into, off, len); - if (read != -1) { - fileOffset += read; - if (markPos != 0 && read > readLimit) // Invalidate mark position - markPos = 0; + while(!eof && unconfirmedReads.size() <= maxUnconfirmedReads) { + // Send read requests as long as there is no EOF and we have not reached the maximum parallelism + unconfirmedReads.add(asyncRead(fileOffset, len)); + fileOffset += len; } - return read; + if(unconfirmedReads.isEmpty()) { + // Attempted to read while status was already received + return -1; + } + // Retrieve first in + final Response res = unconfirmedReads.remove().retrieve( + requester.getTimeoutMs(), TimeUnit.MILLISECONDS); + final int recvLen = checkReadResponse(res, into, off); + if(markPos != 0 && recvLen > readLimit) // Invalidate mark position + { + markPos = 0; + } + if(-1 == recvLen) { + eof = true; + } + return recvLen; } + @Override + public void close() throws IOException { + while(!unconfirmedReads.isEmpty()) { + final Response res = unconfirmedReads.remove().retrieve( + requester.getTimeoutMs(), TimeUnit.MILLISECONDS); + switch(res.getType()) { + case STATUS: + res.ensureStatusIs(StatusCode.EOF); + break; + case DATA: + log.warn("Pending data packet from read response discarded"); + continue; + default: + throw new SFTPException("Unexpected packet: " + res.getType()); + } + } + } } }