diff --git a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java index 5bb48362..d40136d9 100644 --- a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java +++ b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java @@ -19,6 +19,7 @@ import net.schmizz.concurrent.Promise; import net.schmizz.sshj.common.Buffer; import net.schmizz.sshj.sftp.Response.StatusCode; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -88,8 +89,7 @@ public class RemoteFile throws IOException { return requester.request(newRequest(PacketType.WRITE) .putUInt64(fileOffset) - // TODO The SFTP spec claims this field is unneeded...? See #187 - .putUInt32(len) + .putUInt32(len - off) .putRawBytes(data, off, len) ); } @@ -238,27 +238,56 @@ public class RemoteFile private final byte[] b = new byte[1]; private final int maxUnconfirmedReads; - private final Queue> unconfirmedReads; + private final Queue> unconfirmedReads = new LinkedList>(); + private final Queue unconfirmedReadOffsets = new LinkedList(); - private long fileOffset; + private long requestOffset; + private long responseOffset; private boolean eof; public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) { + assert 0 <= maxUnconfirmedReads; + this.maxUnconfirmedReads = maxUnconfirmedReads; - this.unconfirmedReads = new LinkedList>(); - this.fileOffset = 0; } public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) { + assert 0 <= maxUnconfirmedReads; + assert 0 <= fileOffset; + this.maxUnconfirmedReads = maxUnconfirmedReads; - this.unconfirmedReads = new LinkedList>(); - this.fileOffset = fileOffset; + this.requestOffset = this.responseOffset = fileOffset; } - @Override - public long skip(long n) - throws IOException { - throw new IOException("skip is not supported by ReadAheadFileInputStream, use RemoteFileInputStream instead"); + private ByteArrayInputStream pending = new ByteArrayInputStream( new byte[0] ); + + private boolean retrieveUnconfirmedRead(boolean blocking) throws IOException { + + if (unconfirmedReads.size() <= 0) + return false; + + if (!blocking && !unconfirmedReads.peek().isDelivered()) + return false; + + unconfirmedReadOffsets.remove(); + final Response res = unconfirmedReads.remove().retrieve( requester.getTimeoutMs(), TimeUnit.MILLISECONDS ); + switch (res.getType()) { + case DATA: + int recvLen = res.readUInt32AsInt(); + + responseOffset += recvLen; + pending = new ByteArrayInputStream( res.array(), res.rpos(), recvLen ); + break; + + case STATUS: + res.ensureStatusIs( Response.StatusCode.EOF ); + eof = true; + break; + + default: + throw new SFTPException( "Unexpected packet: " + res.getType() ); + } + return true; } @Override @@ -268,26 +297,64 @@ public class RemoteFile } @Override - public int read(byte[] into, int off, int len) - throws IOException { - while (!eof && unconfirmedReads.size() <= maxUnconfirmedReads) { - // Send read requests as long as there is no EOF and we have not reached the maximum parallelism - unconfirmedReads.add(asyncRead(fileOffset, len)); - fileOffset += len; + public int read(byte[] into, int off, int len) throws IOException { + + while (!eof && pending.available() <= 0) { + + // we also need to go here for len <= 0, because pending may be at + // EOF in which case it would return -1 instead of 0 + + while (unconfirmedReads.size() <= maxUnconfirmedReads) { + // Send read requests as long as there is no EOF and we have not reached the maximum parallelism + int reqLen = Math.max( 1024, len ); // don't be shy! + unconfirmedReads.add( RemoteFile.this.asyncRead( requestOffset, reqLen ) ); + unconfirmedReadOffsets.add( requestOffset ); + requestOffset += reqLen; + } + + long nextOffset = unconfirmedReadOffsets.peek(); + if (responseOffset != nextOffset) { + + // the server could not give us all the data we needed, so + // we try to fill the gap synchronously + + assert responseOffset < nextOffset; + assert 0 < (nextOffset - responseOffset); + assert (nextOffset - responseOffset) <= Integer.MAX_VALUE; + + byte[] buf = new byte[(int) (nextOffset - responseOffset)]; + int recvLen = RemoteFile.this.read( responseOffset, buf, 0, buf.length ); + + if (recvLen < 0) { + eof = true; + return -1; + } + + if (0 == recvLen) // avoid infinite loops + throw new SFTPException( "Unexpected response size (0), bailing out" ); + + responseOffset += recvLen; + pending = new ByteArrayInputStream( buf, 0, recvLen ); + } + else + if (!retrieveUnconfirmedRead( true /*blocking*/ )) { + + // this may happen if we change prefetch strategy + // currently, we should never get here... + + throw new IllegalStateException( "Could not retrieve data for pending read request" ); + } } - if (unconfirmedReads.isEmpty()) { - assert eof; - return -1; - } - // Retrieve first in - final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); - final int recvLen = checkReadResponse(res, into, off); - if (recvLen == -1) { - eof = true; - } - return recvLen; + + return pending.read( into, off, len ); } + @Override + public int available() throws IOException { + while (!eof && (pending.available() <= 0) && retrieveUnconfirmedRead( false /*blocking*/ )) + /*loop*/; + return pending.available(); + } } +} -} \ No newline at end of file