summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-25 23:21:13 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-25 23:21:13 +0000
commit94a2e74021b5796280b7e4464a02eb80824f9664 (patch)
tree7793ddef93c0440adbcba9826e5f72b7c861891b /java/common
parentc620d4547f7dd7fb5f2a08e61ea113e58ce379e8 (diff)
downloadqpid-python-94a2e74021b5796280b7e4464a02eb80824f9664.tar.gz
Made the various receive variants check that the server queue is empty before returning null. Also modified AMQQueueBrowser to use receiveNoWait() when browsing queues using 0-10. These changes uncovered numerous second order bugs, mostly in failover. These are also fixed. This fixes QPID-1642 and QPID-1643.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyClient.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Echo.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java42
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionListener.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sink.java2
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java3
6 files changed, 43 insertions, 10 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java
index 3491af8cd2..5b2db10613 100644
--- a/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -37,6 +37,8 @@ class ToyClient implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void exception(Session ssn, SessionException exc)
{
exc.printStackTrace();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index c1031c9a1c..0e969464ab 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -37,6 +37,8 @@ public class Echo implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
int id = xfr.getId();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 4079097f96..9d2686a6f7 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -53,13 +53,15 @@ public class Session extends SessionInvoker
private static final Logger log = Logger.get(Session.class);
- enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+ enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
class DefaultSessionListener implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
log.info("message: %s", xfr);
@@ -107,6 +109,8 @@ public class Session extends SessionInvoker
private volatile boolean flowControl = false;
private Semaphore credit = new Semaphore(0);
+ private Thread resumer = null;
+
Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
@@ -234,15 +238,21 @@ public class Session extends SessionInvoker
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
- if (m != null)
+ if (m == null)
{
- sessionCommandPoint(m.getId(), 0);
- send(m);
+ m = new ExecutionSync();
+ m.setId(i);
}
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
}
sessionCommandPoint(commandsOut, 0);
sessionFlush(COMPLETED);
+ resumer = Thread.currentThread();
+ state = RESUMING;
+ listener.resumed(this);
+ resumer = null;
}
}
@@ -387,7 +397,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED)
+ if (state == DETACHED || state == CLOSING)
{
return;
}
@@ -499,10 +509,14 @@ public class Session extends SessionInvoker
if (state != OPEN && state != CLOSED)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
@@ -510,6 +524,14 @@ public class Session extends SessionInvoker
{
case OPEN:
break;
+ case RESUMING:
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ throw new SessionException
+ ("timed out waiting for resume to finish");
+ }
+ break;
case CLOSED:
throw new SessionClosedException();
default:
@@ -527,7 +549,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && isFull(next))
{
- if (state == OPEN)
+ if (state == OPEN || state == RESUMING)
{
try
{
@@ -560,7 +582,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0)
+ if (expiry > 0 && !m.isUnreliable())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
index 63690177f9..eb650eb9ed 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
@@ -31,6 +31,8 @@ public interface SessionListener
void opened(Session session);
+ void resumed(Session session);
+
void message(Session ssn, MessageTransfer xfr);
void exception(Session session, SessionException exception);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
index 622993effb..88870284f6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -87,6 +87,8 @@ public class Sink implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
count++;
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index dca6264367..3d634bfb70 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -74,6 +74,8 @@ public class ConnectionTest extends TestCase implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(final Session ssn, MessageTransfer xfr)
{
if (queue)
@@ -277,6 +279,7 @@ public class ConnectionTest extends TestCase implements SessionListener
class TestSessionListener implements SessionListener
{
public void opened(Session s) {}
+ public void resumed(Session s) {}
public void exception(Session s, SessionException e) {}
public void message(Session s, MessageTransfer xfr)
{