Implement concurent write requests.

This commit is contained in:
Romain Reuillon
2012-12-12 08:29:26 +01:00
parent cbd118e0b1
commit 9acff6202c
3 changed files with 57 additions and 4 deletions

View File

@@ -15,15 +15,24 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.sftp.Response.StatusCode; import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
public class RemoteFile public class RemoteFile
extends RemoteResource { extends RemoteResource {
public static final int DEFAULT_CONCURRENT_REQUESTS = 10;
protected volatile int concurrentRequests = DEFAULT_CONCURRENT_REQUESTS;
private Queue<Promise<Response, SFTPException>> writeRequestsQueue = new LinkedBlockingQueue<Promise<Response, SFTPException>>();
public RemoteFile(Requester requester, String path, String handle) { public RemoteFile(Requester requester, String path, String handle) {
super(requester, path, handle); super(requester, path, handle);
} }
@@ -73,11 +82,14 @@ public class RemoteFile
public void write(long fileOffset, byte[] data, int off, int len) public void write(long fileOffset, byte[] data, int off, int len)
throws IOException { throws IOException {
requester.doRequest(newRequest(PacketType.WRITE) Request request = newRequest(PacketType.WRITE)
.putUInt64(fileOffset) .putUInt64(fileOffset)
.putUInt32(len - off) .putUInt32(len - off)
.putRawBytes(data, off, len) .putRawBytes(data, off, len);
).ensureStatusPacketIsOK(); writeRequestsQueue.add(requester.request(request));
while (writeRequestsQueue.size() >= getConcurrentRequests()) {
requester.retrieve(writeRequestsQueue.remove());
}
} }
public void setAttributes(FileAttributes attrs) public void setAttributes(FileAttributes attrs)
@@ -186,4 +198,23 @@ public class RemoteFile
} }
} }
@Override
public void close() throws IOException {
try {
while(!writeRequestsQueue.isEmpty()) {
requester.retrieve(writeRequestsQueue.remove());
}
} finally {
super.close();
}
}
public void setConcurrentRequests(int concurrentRequests) {
this.concurrentRequests = concurrentRequests;
}
public int getConcurrentRequests() {
return concurrentRequests;
}
} }

View File

@@ -15,6 +15,8 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import java.io.IOException; import java.io.IOException;
public interface Requester { public interface Requester {
@@ -26,4 +28,10 @@ public interface Requester {
Response doRequest(Request req) Response doRequest(Request req)
throws IOException; throws IOException;
Promise<Response, SFTPException> request(Request request)
throws IOException;
void retrieve(Promise<Response, SFTPException> response)
throws IOException;
} }

View File

@@ -15,6 +15,7 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.common.SSHException;
import net.schmizz.sshj.connection.channel.direct.Session.Subsystem; import net.schmizz.sshj.connection.channel.direct.Session.Subsystem;
import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.connection.channel.direct.SessionFactory;
@@ -257,7 +258,7 @@ public class SFTPEngine
throw new SFTPException("Unexpected data in " + res.getType() + " packet"); throw new SFTPException("Unexpected data in " + res.getType() + " packet");
} }
protected synchronized void transmit(SFTPPacket<Request> payload) private synchronized void transmit(SFTPPacket<Request> payload)
throws IOException { throws IOException {
final int len = payload.available(); final int len = payload.available();
out.write((len >>> 24) & 0xff); out.write((len >>> 24) & 0xff);
@@ -268,4 +269,17 @@ public class SFTPEngine
out.flush(); out.flush();
} }
@Override
public Promise<Response, SFTPException> request(Request req) throws IOException {
reader.expectResponseTo(req);
log.debug("Sending {}", req);
transmit(req);
return req.getResponsePromise();
}
@Override
public void retrieve(Promise<Response, SFTPException> request)
throws IOException {
request.retrieve(timeout, TimeUnit.SECONDS).ensureStatusPacketIsOK();
}
} }