diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-02-25 23:21:13 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-02-25 23:21:13 +0000 |
| commit | 94a2e74021b5796280b7e4464a02eb80824f9664 (patch) | |
| tree | 7793ddef93c0440adbcba9826e5f72b7c861891b /java/common | |
| parent | c620d4547f7dd7fb5f2a08e61ea113e58ce379e8 (diff) | |
| download | qpid-python-94a2e74021b5796280b7e4464a02eb80824f9664.tar.gz | |
Made the various receive variants check that the server queue is empty before returning null. Also modified AMQQueueBrowser to use receiveNoWait() when browsing queues using 0-10. These changes uncovered numerous second order bugs, mostly in failover. These are also fixed. This fixes QPID-1642 and QPID-1643.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
6 files changed, 43 insertions, 10 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java index 3491af8cd2..5b2db10613 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -37,6 +37,8 @@ class ToyClient implements SessionListener { public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void exception(Session ssn, SessionException exc) { exc.printStackTrace(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index c1031c9a1c..0e969464ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -37,6 +37,8 @@ public class Echo implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { int id = xfr.getId(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 4079097f96..9d2686a6f7 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -53,13 +53,15 @@ public class Session extends SessionInvoker private static final Logger log = Logger.get(Session.class); - enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED } + enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } class DefaultSessionListener implements SessionListener { public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { log.info("message: %s", xfr); @@ -107,6 +109,8 @@ public class Session extends SessionInvoker private volatile boolean flowControl = false; private Semaphore credit = new Semaphore(0); + private Thread resumer = null; + Session(Connection connection, Binary name, long expiry) { this.connection = connection; @@ -234,15 +238,21 @@ public class Session extends SessionInvoker for (int i = maxComplete + 1; lt(i, commandsOut); i++) { Method m = commands[mod(i, commands.length)]; - if (m != null) + if (m == null) { - sessionCommandPoint(m.getId(), 0); - send(m); + m = new ExecutionSync(); + m.setId(i); } + sessionCommandPoint(m.getId(), 0); + send(m); } sessionCommandPoint(commandsOut, 0); sessionFlush(COMPLETED); + resumer = Thread.currentThread(); + state = RESUMING; + listener.resumed(this); + resumer = null; } } @@ -387,7 +397,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED) + if (state == DETACHED || state == CLOSING) { return; } @@ -499,10 +509,14 @@ public class Session extends SessionInvoker if (state != OPEN && state != CLOSED) { - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && (state != OPEN && state != CLOSED)) + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) { - w.await(); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) + { + w.await(); + } } } @@ -510,6 +524,14 @@ public class Session extends SessionInvoker { case OPEN: break; + case RESUMING: + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + throw new SessionException + ("timed out waiting for resume to finish"); + } + break; case CLOSED: throw new SessionClosedException(); default: @@ -527,7 +549,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && isFull(next)) { - if (state == OPEN) + if (state == OPEN || state == RESUMING) { try { @@ -560,7 +582,7 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - if (expiry > 0) + if (expiry > 0 && !m.isUnreliable()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java index 63690177f9..eb650eb9ed 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java @@ -31,6 +31,8 @@ public interface SessionListener void opened(Session session); + void resumed(Session session); + void message(Session ssn, MessageTransfer xfr); void exception(Session session, SessionException exception); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java index 622993effb..88870284f6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -87,6 +87,8 @@ public class Sink implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { count++; diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index dca6264367..3d634bfb70 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -74,6 +74,8 @@ public class ConnectionTest extends TestCase implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(final Session ssn, MessageTransfer xfr) { if (queue) @@ -277,6 +279,7 @@ public class ConnectionTest extends TestCase implements SessionListener class TestSessionListener implements SessionListener { public void opened(Session s) {} + public void resumed(Session s) {} public void exception(Session s, SessionException e) {} public void message(Session s, MessageTransfer xfr) { |
