Make window size a long, as it can be upto (2^32 - 1)

Fix for #57
This commit is contained in:
Shikhar Bhushan
2012-02-06 22:24:52 +00:00
parent d38bbbcdf7
commit ee07072846
9 changed files with 35 additions and 33 deletions

View File

@@ -116,9 +116,9 @@ public abstract class AbstractChannel
close = new Event<ConnectionException>("chan#" + id + " / " + "close", ConnectionException.chainer, lock); close = new Event<ConnectionException>("chan#" + id + " / " + "close", ConnectionException.chainer, lock);
} }
protected void init(int recipient, int remoteWinSize, int remoteMaxPacketSize) { protected void init(int recipient, long remoteWinSize, long remoteMaxPacketSize) {
this.recipient = recipient; this.recipient = recipient;
rwin = new Window.Remote(remoteWinSize, remoteMaxPacketSize); rwin = new Window.Remote(remoteWinSize, (int) Math.min(remoteMaxPacketSize, Integer.MAX_VALUE));
out = new ChannelOutputStream(this, trans, rwin); out = new ChannelOutputStream(this, trans, rwin);
log.info("Initialized - {}", this); log.info("Initialized - {}", this);
} }
@@ -144,7 +144,7 @@ public abstract class AbstractChannel
} }
@Override @Override
public int getLocalWinSize() { public long getLocalWinSize() {
return lwin.getSize(); return lwin.getSize();
} }
@@ -164,7 +164,7 @@ public abstract class AbstractChannel
} }
@Override @Override
public int getRemoteWinSize() { public long getRemoteWinSize() {
return rwin.getSize(); return rwin.getSize();
} }
@@ -315,9 +315,9 @@ public abstract class AbstractChannel
private void gotWindowAdjustment(SSHPacket buf) private void gotWindowAdjustment(SSHPacket buf)
throws ConnectionException { throws ConnectionException {
final int howMuch; final long howMuch;
try { try {
howMuch = buf.readUInt32AsInt(); howMuch = buf.readUInt32();
} catch (Buffer.BufferException be) { } catch (Buffer.BufferException be) {
throw new ConnectionException(be); throw new ConnectionException(be);
} }

View File

@@ -99,7 +99,7 @@ public interface Channel
int getLocalMaxPacketSize(); int getLocalMaxPacketSize();
/** @return the current local window size. */ /** @return the current local window size. */
int getLocalWinSize(); long getLocalWinSize();
/** @return an {@code OutputStream} for this channel. */ /** @return an {@code OutputStream} for this channel. */
OutputStream getOutputStream(); OutputStream getOutputStream();
@@ -111,7 +111,7 @@ public interface Channel
int getRemoteMaxPacketSize(); int getRemoteMaxPacketSize();
/** @return the current remote window size. */ /** @return the current remote window size. */
int getRemoteWinSize(); long getRemoteWinSize();
/** @return the channel type identifier. */ /** @return the channel type identifier. */
String getType(); String getType();

View File

@@ -159,7 +159,7 @@ public final class ChannelInputStream
private void checkWindow() private void checkWindow()
throws TransportException { throws TransportException {
synchronized (win) { synchronized (win) {
final int adjustment = win.neededAdjustment(); final long adjustment = win.neededAdjustment();
if (adjustment > 0) { if (adjustment > 0) {
log.info("Sending SSH_MSG_CHANNEL_WINDOW_ADJUST to #{} for {} bytes", chan.getRecipient(), adjustment); log.info("Sending SSH_MSG_CHANNEL_WINDOW_ADJUST to #{} for {} bytes", chan.getRecipient(), adjustment);
trans.write(new SSHPacket(Message.CHANNEL_WINDOW_ADJUST) trans.write(new SSHPacket(Message.CHANNEL_WINDOW_ADJUST)

View File

@@ -97,12 +97,12 @@ public final class ChannelOutputStream
throws TransportException, ConnectionException { throws TransportException, ConnectionException {
flush(packet.wpos() - dataOffset); flush(packet.wpos() - dataOffset);
} }
void flush(int bufferSize) void flush(int bufferSize)
throws TransportException, ConnectionException { throws TransportException, ConnectionException {
while (bufferSize > 0) { while (bufferSize > 0) {
int remoteWindowSize = win.getSize(); long remoteWindowSize = win.getSize();
if (remoteWindowSize == 0) if (remoteWindowSize == 0)
remoteWindowSize = win.awaitExpansion(remoteWindowSize); remoteWindowSize = win.awaitExpansion(remoteWindowSize);
@@ -110,7 +110,7 @@ public final class ChannelOutputStream
// a) how much data we have // a) how much data we have
// b) the max packet size // b) the max packet size
// c) what the current window size will allow // c) what the current window size will allow
final int writeNow = Math.min(bufferSize, Math.min(win.getMaxPacketSize(), remoteWindowSize)); final int writeNow = Math.min(bufferSize, (int) Math.min(win.getMaxPacketSize(), remoteWindowSize));
packet.wpos(headerOffset); packet.wpos(headerOffset);
packet.putMessageID(Message.CHANNEL_DATA); packet.putMessageID(Message.CHANNEL_DATA);

View File

@@ -28,14 +28,14 @@ public abstract class Window {
protected final int maxPacketSize; protected final int maxPacketSize;
protected int size; protected long size;
public Window(int initialWinSize, int maxPacketSize) { public Window(long initialWinSize, int maxPacketSize) {
size = initialWinSize; size = initialWinSize;
this.maxPacketSize = maxPacketSize; this.maxPacketSize = maxPacketSize;
} }
public void expand(int inc) { public void expand(long inc) {
synchronized (lock) { synchronized (lock) {
size += inc; size += inc;
log.debug("Increasing by {} up to {}", inc, size); log.debug("Increasing by {} up to {}", inc, size);
@@ -47,13 +47,13 @@ public abstract class Window {
return maxPacketSize; return maxPacketSize;
} }
public int getSize() { public long getSize() {
synchronized (lock) { synchronized (lock) {
return size; return size;
} }
} }
public void consume(int dec) public void consume(long dec)
throws ConnectionException { throws ConnectionException {
synchronized (lock) { synchronized (lock) {
size -= dec; size -= dec;
@@ -72,11 +72,11 @@ public abstract class Window {
public static final class Remote public static final class Remote
extends Window { extends Window {
public Remote(int initialWinSize, int maxPacketSize) { public Remote(long initialWinSize, int maxPacketSize) {
super(initialWinSize, maxPacketSize); super(initialWinSize, maxPacketSize);
} }
public int awaitExpansion(int was) public long awaitExpansion(long was)
throws ConnectionException { throws ConnectionException {
synchronized (lock) { synchronized (lock) {
while (size <= was) { while (size <= was) {
@@ -91,7 +91,7 @@ public abstract class Window {
} }
} }
public void consume(int howMuch) { public void consume(long howMuch) {
try { try {
super.consume(howMuch); super.consume(howMuch);
} catch (ConnectionException e) { // It's a bug if we consume more than remote allowed } catch (ConnectionException e) { // It's a bug if we consume more than remote allowed
@@ -105,16 +105,16 @@ public abstract class Window {
public static final class Local public static final class Local
extends Window { extends Window {
private final int initialSize; private final long initialSize;
private final int threshold; private final long threshold;
public Local(int initialWinSize, int maxPacketSize) { public Local(long initialWinSize, int maxPacketSize) {
super(initialWinSize, maxPacketSize); super(initialWinSize, maxPacketSize);
this.initialSize = initialWinSize; this.initialSize = initialWinSize;
threshold = Math.min(maxPacketSize * 20, initialSize / 4); threshold = Math.min(maxPacketSize * 20, initialSize / 4);
} }
public int neededAdjustment() { public long neededAdjustment() {
synchronized (lock) { synchronized (lock) {
return (size <= threshold) ? (initialSize - size) : 0; return (size <= threshold) ? (initialSize - size) : 0;
} }

View File

@@ -71,7 +71,7 @@ public abstract class AbstractDirectChannel
private void gotOpenConfirmation(SSHPacket buf) private void gotOpenConfirmation(SSHPacket buf)
throws ConnectionException { throws ConnectionException {
try { try {
init(buf.readUInt32AsInt(), buf.readUInt32AsInt(), buf.readUInt32AsInt()); init(buf.readUInt32AsInt(), buf.readUInt32(), buf.readUInt32());
} catch (Buffer.BufferException be) { } catch (Buffer.BufferException be) {
throw new ConnectionException(be); throw new ConnectionException(be);
} }

View File

@@ -54,8 +54,9 @@ public abstract class AbstractForwardedChannel
* First 2 args are standard; the others can be parsed from a CHANNEL_OPEN packet. * First 2 args are standard; the others can be parsed from a CHANNEL_OPEN packet.
*/ */
protected AbstractForwardedChannel(Connection conn, String type, int recipient, int remoteWinSize, protected AbstractForwardedChannel(Connection conn, String type,
int remoteMaxPacketSize, String origIP, int origPort) { int recipient, long remoteWinSize, long remoteMaxPacketSize,
String origIP, int origPort) {
super(conn, type); super(conn, type);
this.origIP = origIP; this.origIP = origIP;
this.origPort = origPort; this.origPort = origPort;

View File

@@ -127,7 +127,8 @@ public class RemotePortForwarder
private final Forward fwd; private final Forward fwd;
public ForwardedTCPIPChannel(Connection conn, int recipient, int remoteWinSize, int remoteMaxPacketSize, public ForwardedTCPIPChannel(Connection conn,
int recipient, long remoteWinSize, long remoteMaxPacketSize,
Forward fwd, String origIP, int origPort) { Forward fwd, String origIP, int origPort) {
super(conn, TYPE, recipient, remoteWinSize, remoteMaxPacketSize, origIP, origPort); super(conn, TYPE, recipient, remoteWinSize, remoteMaxPacketSize, origIP, origPort);
this.fwd = fwd; this.fwd = fwd;
@@ -217,7 +218,7 @@ public class RemotePortForwarder
throws ConnectionException, TransportException { throws ConnectionException, TransportException {
final ForwardedTCPIPChannel chan; final ForwardedTCPIPChannel chan;
try { try {
chan = new ForwardedTCPIPChannel(conn, buf.readUInt32AsInt(), buf.readUInt32AsInt(), buf.readUInt32AsInt(), chan = new ForwardedTCPIPChannel(conn, buf.readUInt32AsInt(), buf.readUInt32(), buf.readUInt32(),
new Forward(buf.readString(), buf.readUInt32AsInt()), new Forward(buf.readString(), buf.readUInt32AsInt()),
buf.readString(), buf.readUInt32AsInt()); buf.readString(), buf.readUInt32AsInt());
} catch (Buffer.BufferException be) { } catch (Buffer.BufferException be) {

View File

@@ -34,8 +34,9 @@ public class X11Forwarder
public static final String TYPE = "x11"; public static final String TYPE = "x11";
public X11Channel(Connection conn, int recipient, int remoteWinSize, int remoteMaxPacketSize, String origIP, public X11Channel(Connection conn,
int origPort) { int recipient, long remoteWinSize, long remoteMaxPacketSize,
String origIP, int origPort) {
super(conn, TYPE, recipient, remoteWinSize, remoteMaxPacketSize, origIP, origPort); super(conn, TYPE, recipient, remoteWinSize, remoteMaxPacketSize, origIP, origPort);
} }
@@ -58,8 +59,7 @@ public class X11Forwarder
throws ConnectionException, TransportException { throws ConnectionException, TransportException {
try { try {
callListener(listener, new X11Channel(conn, callListener(listener, new X11Channel(conn,
buf.readUInt32AsInt(), buf.readUInt32AsInt(), buf.readUInt32(), buf.readUInt32(),
buf.readUInt32AsInt(), buf.readUInt32AsInt(),
buf.readString(), buf.readUInt32AsInt())); buf.readString(), buf.readUInt32AsInt()));
} catch (Buffer.BufferException be) { } catch (Buffer.BufferException be) {
throw new ConnectionException(be); throw new ConnectionException(be);