Add parameter to limit read ahead to maximum length. Allows to use mu… (#724)

* Add parameter to limit read ahead to maximum length. Allows to use multiple concurrent threads reading from the same file with an offset without reading too much ahead for a single segment.

* Review and add tests.

Signed-off-by: David Kocher <dkocher@iterate.ch>

Co-authored-by: Yves Langisch <yves@langisch.ch>
This commit is contained in:
David Kocher
2021-12-23 22:24:52 +01:00
committed by GitHub
parent cab7731928
commit 8cf63a96a9
2 changed files with 109 additions and 4 deletions

View File

@@ -224,6 +224,7 @@ public class RemoteFile
private final byte[] b = new byte[1]; private final byte[] b = new byte[1];
private final int maxUnconfirmedReads; private final int maxUnconfirmedReads;
private final long readAheadLimit;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>(); private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>(); private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>();
@@ -232,17 +233,22 @@ public class RemoteFile
private boolean eof; private boolean eof;
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) { public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
assert 0 <= maxUnconfirmedReads; this(maxUnconfirmedReads, 0L, -1L);
this.maxUnconfirmedReads = maxUnconfirmedReads;
} }
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) { /**
*
* @param maxUnconfirmedReads Maximum number of unconfirmed requests to send
* @param fileOffset Initial offset in file to read from
* @param readAheadLimit Read ahead is disabled after this limit has been reached
*/
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset, long readAheadLimit) {
assert 0 <= maxUnconfirmedReads; assert 0 <= maxUnconfirmedReads;
assert 0 <= fileOffset; assert 0 <= fileOffset;
this.maxUnconfirmedReads = maxUnconfirmedReads; this.maxUnconfirmedReads = maxUnconfirmedReads;
this.requestOffset = this.responseOffset = fileOffset; this.requestOffset = this.responseOffset = fileOffset;
this.readAheadLimit = readAheadLimit > 0 ? fileOffset + readAheadLimit : Long.MAX_VALUE;
} }
private ByteArrayInputStream pending = new ByteArrayInputStream(new byte[0]); private ByteArrayInputStream pending = new ByteArrayInputStream(new byte[0]);
@@ -293,9 +299,18 @@ public class RemoteFile
while (unconfirmedReads.size() <= maxUnconfirmedReads) { while (unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism // Send read requests as long as there is no EOF and we have not reached the maximum parallelism
int reqLen = Math.max(1024, len); // don't be shy! int reqLen = Math.max(1024, len); // don't be shy!
if (readAheadLimit > requestOffset) {
long remaining = readAheadLimit - requestOffset;
if (reqLen > remaining) {
reqLen = (int) remaining;
}
}
unconfirmedReads.add(RemoteFile.this.asyncRead(requestOffset, reqLen)); unconfirmedReads.add(RemoteFile.this.asyncRead(requestOffset, reqLen));
unconfirmedReadOffsets.add(requestOffset); unconfirmedReadOffsets.add(requestOffset);
requestOffset += reqLen; requestOffset += reqLen;
if (requestOffset >= readAheadLimit) {
break;
}
} }
long nextOffset = unconfirmedReadOffsets.peek(); long nextOffset = unconfirmedReadOffsets.peek();

View File

@@ -20,6 +20,7 @@ import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.OpenMode; import net.schmizz.sshj.sftp.OpenMode;
import net.schmizz.sshj.sftp.RemoteFile; import net.schmizz.sshj.sftp.RemoteFile;
import net.schmizz.sshj.sftp.SFTPEngine; import net.schmizz.sshj.sftp.SFTPEngine;
import net.schmizz.sshj.sftp.SFTPException;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
@@ -32,6 +33,7 @@ import java.util.Random;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
public class RemoteFileTest { public class RemoteFileTest {
@Rule @Rule
@@ -84,4 +86,92 @@ public class RemoteFileTest {
assertThat("The written and received data should match", data, equalTo(test2)); assertThat("The written and received data should match", data, equalTo(test2));
} }
@Test
public void shouldNotReadAheadAfterLimitInputStream() throws IOException {
SSHClient ssh = fixture.setupConnectedDefaultClient();
ssh.authPassword("test", "test");
SFTPEngine sftp = new SFTPEngine(ssh).init();
RemoteFile rf;
File file = temp.newFile("SftpReadAheadLimitTest.bin");
rf = sftp.open(file.getPath(), EnumSet.of(OpenMode.WRITE, OpenMode.CREAT));
byte[] data = new byte[8192];
new Random(53).nextBytes(data);
data[3072] = 1;
rf.write(0, data, 0, data.length);
rf.close();
assertThat("The file should exist", file.exists());
rf = sftp.open(file.getPath());
InputStream rs = rf.new ReadAheadRemoteFileInputStream(16 /*maxUnconfirmedReads*/,0, 3072);
byte[] test = new byte[4097];
int n = 0;
while (n < 2048) {
n += rs.read(test, n, 2048 - n);
}
rf.close();
while (n < 3072) {
n += rs.read(test, n, 3072 - n);
}
assertThat("buffer overrun", test[3072] == 0);
try {
rs.read(test, n, test.length - n);
fail("Content must not be buffered");
} catch (SFTPException e){
// expected
}
}
@Test
public void limitedReadAheadInputStream() throws IOException {
SSHClient ssh = fixture.setupConnectedDefaultClient();
ssh.authPassword("test", "test");
SFTPEngine sftp = new SFTPEngine(ssh).init();
RemoteFile rf;
File file = temp.newFile("SftpReadAheadLimitedTest.bin");
rf = sftp.open(file.getPath(), EnumSet.of(OpenMode.WRITE, OpenMode.CREAT));
byte[] data = new byte[8192];
new Random(53).nextBytes(data);
data[3072] = 1;
rf.write(0, data, 0, data.length);
rf.close();
assertThat("The file should exist", file.exists());
rf = sftp.open(file.getPath());
InputStream rs = rf.new ReadAheadRemoteFileInputStream(16 /*maxUnconfirmedReads*/,0, 3072);
byte[] test = new byte[4097];
int n = 0;
while (n < 2048) {
n += rs.read(test, n, 2048 - n);
}
while (n < 3072) {
n += rs.read(test, n, 3072 - n);
}
assertThat("buffer overrun", test[3072] == 0);
n += rs.read(test, n, test.length - n); // --> ArrayIndexOutOfBoundsException
byte[] test2 = new byte[data.length];
System.arraycopy(test, 0, test2, 0, test.length);
while (n < data.length) {
n += rs.read(test2, n, data.length - n);
}
assertThat("The written and received data should match", data, equalTo(test2));
}
} }