get rid of over-zealous sending of channel EOF & close messages which was implemented with questionable synchronization

fixes #105

also relevant to #126 since AbstractChannel does not synchronize on
'this' anymore
This commit is contained in:
shikhar
2014-06-24 17:56:14 +05:30
parent 2a7278d239
commit 5ee2f0a417
3 changed files with 24 additions and 50 deletions

View File

@@ -86,10 +86,7 @@ public abstract class AbstractChannel
protected final Event<ConnectionException> openEvent;
/** Channel close event */
protected final Event<ConnectionException> closeEvent;
/* Access to these fields should be synchronized using this object */
private boolean eofSent;
private boolean eofGot;
/** Whether we have already sent a CHANNEL_CLOSE request to the server */
private boolean closeRequested;
/** Local window */
@@ -282,8 +279,9 @@ public abstract class AbstractChannel
closeEvent.await(timeout, unit);
}
protected synchronized void sendClose()
protected void sendClose()
throws TransportException {
openCloseLock.lock();
try {
if (!closeRequested) {
log.debug("Sending close");
@@ -291,11 +289,12 @@ public abstract class AbstractChannel
}
} finally {
closeRequested = true;
openCloseLock.unlock();
}
}
@Override
public synchronized boolean isOpen() {
public boolean isOpen() {
openCloseLock.lock();
try {
return openEvent.isSet() && !closeEvent.isSet() && !closeRequested;
@@ -405,13 +404,10 @@ public abstract class AbstractChannel
}
}
private synchronized void gotEOF()
private void gotEOF()
throws TransportException {
log.debug("Got EOF");
eofGot = true;
eofInputStreams();
if (eofSent)
sendClose();
}
/** Called when EOF has been received. Subclasses can override but must call super. */
@@ -419,22 +415,6 @@ public abstract class AbstractChannel
in.eof();
}
@Override
public synchronized void sendEOF()
throws TransportException {
try {
if (!closeRequested && !eofSent) {
log.debug("Sending EOF");
trans.write(newBuffer(Message.CHANNEL_EOF));
if (eofGot)
sendClose();
}
} finally {
eofSent = true;
out.setClosed();
}
}
@Override
public String toString() {
return "< " + type + " channel: id=" + id + ", recipient=" + recipient + ", localWin=" + lwin + ", remoteWin="

View File

@@ -119,15 +119,6 @@ public interface Channel
/** @return whether the channel is open. */
boolean isOpen();
/**
* Sends an EOF message to the server for this channel; indicating that no more data will be sent by us. The {@code
* OutputStream} for this channel will be closed and no longer usable.
*
* @throws TransportException if there is an error sending the EOF message
*/
void sendEOF()
throws TransportException;
/**
* Set whether local window should automatically expand when data is received, irrespective of whether data has been
* read from that stream. This is useful e.g. when a remote command produces a lot of output that would fill the

View File

@@ -84,7 +84,7 @@ public final class ChannelOutputStream
throws TransportException, ConnectionException {
final int bufferSize = packet.wpos() - dataOffset;
if (bufferSize >= win.getMaxPacketSize()) {
flush(bufferSize);
flush(bufferSize, true);
return 0;
} else {
final int n = Math.min(len, win.getMaxPacketSize() - bufferSize);
@@ -93,18 +93,23 @@ public final class ChannelOutputStream
}
}
void flush()
boolean flush(boolean canAwaitExpansion)
throws TransportException, ConnectionException {
flush(packet.wpos() - dataOffset);
return flush(packet.wpos() - dataOffset, canAwaitExpansion);
}
void flush(int bufferSize)
boolean flush(int bufferSize, boolean canAwaitExpansion)
throws TransportException, ConnectionException {
while (bufferSize > 0) {
long remoteWindowSize = win.getSize();
if (remoteWindowSize == 0)
remoteWindowSize = win.awaitExpansion(remoteWindowSize);
if (remoteWindowSize == 0) {
if (canAwaitExpansion) {
remoteWindowSize = win.awaitExpansion(remoteWindowSize);
} else {
return false;
}
}
// We can only write the min. of
// a) how much data we have
@@ -136,6 +141,8 @@ public final class ChannelOutputStream
bufferSize = leftOverBytes;
}
return true;
}
}
@@ -184,18 +191,14 @@ public final class ChannelOutputStream
throws IOException {
if (!closed) {
try {
buffer.flush();
chan.sendEOF();
buffer.flush(false);
trans.write(new SSHPacket(Message.CHANNEL_EOF).putUInt32(chan.getRecipient()));
} finally {
setClosed();
closed = true;
}
}
}
public synchronized void setClosed() {
closed = true;
}
/**
* Send all data currently buffered. If window space is exhausted in the process, this will block
* until it is expanded by the server.
@@ -206,7 +209,7 @@ public final class ChannelOutputStream
public synchronized void flush()
throws IOException {
checkClose();
buffer.flush();
buffer.flush(true);
}
@Override