Restore mutual exclusion of sendChannelRequest() and gotResponse() in AbstractChannel (but rather than make methods synchronized do it on the queue, which itself doesn't need to be thread-safe). Regression due to 1a2351c5ee. Fixes #35.

This commit is contained in:
Shikhar Bhushan
2011-10-02 09:47:49 +01:00
parent f83bf2cd3f
commit f1b3dbb102

View File

@@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +76,7 @@ public abstract class AbstractChannel
/** Remote recipient ID */ /** Remote recipient ID */
private int recipient; private int recipient;
private final Queue<Event<ConnectionException>> chanReqResponseEvents = new ConcurrentLinkedQueue<Event<ConnectionException>>(); private final Queue<Event<ConnectionException>> chanReqResponseEvents = new LinkedList<Event<ConnectionException>>();
/* The lock used by to create the open & close events */ /* The lock used by to create the open & close events */
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
@@ -368,33 +368,37 @@ public abstract class AbstractChannel
Buffer.PlainBuffer reqSpecific) Buffer.PlainBuffer reqSpecific)
throws TransportException { throws TransportException {
log.info("Sending channel request for `{}`", reqType); log.info("Sending channel request for `{}`", reqType);
trans.write( synchronized (chanReqResponseEvents) {
newBuffer(Message.CHANNEL_REQUEST) trans.write(
.putString(reqType) newBuffer(Message.CHANNEL_REQUEST)
.putBoolean(wantReply) .putString(reqType)
.putBuffer(reqSpecific) .putBoolean(wantReply)
); .putBuffer(reqSpecific)
);
Event<ConnectionException> responseEvent = null; Event<ConnectionException> responseEvent = null;
if (wantReply) { if (wantReply) {
responseEvent = new Event<ConnectionException>("chan#" + id + " / " + "chanreq for " + reqType, responseEvent = new Event<ConnectionException>("chan#" + id + " / " + "chanreq for " + reqType,
ConnectionException.chainer); ConnectionException.chainer);
chanReqResponseEvents.add(responseEvent); chanReqResponseEvents.add(responseEvent);
}
return responseEvent;
} }
return responseEvent;
} }
private void gotResponse(boolean success) private void gotResponse(boolean success)
throws ConnectionException { throws ConnectionException {
final Event<ConnectionException> responseEvent = chanReqResponseEvents.poll(); synchronized (chanReqResponseEvents) {
if (responseEvent != null) { final Event<ConnectionException> responseEvent = chanReqResponseEvents.poll();
if (success) if (responseEvent != null) {
responseEvent.set(); if (success)
else responseEvent.set();
responseEvent.deliverError(new ConnectionException("Request failed")); else
} else responseEvent.deliverError(new ConnectionException("Request failed"));
throw new ConnectionException(DisconnectReason.PROTOCOL_ERROR, } else
"Received response to channel request when none was requested"); throw new ConnectionException(DisconnectReason.PROTOCOL_ERROR,
"Received response to channel request when none was requested");
}
} }
private synchronized void gotEOF() private synchronized void gotEOF()