summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/test
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-23 01:21:22 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-23 01:21:22 +0000
commit3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2 (patch)
tree7082cdbe3e0833f29de69423ec9f0b99eb44aa61 /qpid/java/common/src/test
parent386fc2f8dc103ae078c98e3fe5bcdfb7842f27de (diff)
downloadqpid-python-3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2.tar.gz
QPID-1339: support for low level session resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/test')
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java154
1 files changed, 150 insertions, 4 deletions
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 03fae56250..19e1c2537f 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -28,21 +28,26 @@ import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.io.IoAcceptor;
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import junit.framework.TestCase;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
/**
* ConnectionTest
*/
-public class ConnectionTest extends TestCase
+public class ConnectionTest extends TestCase implements SessionListener
{
private static final Logger log = Logger.get(ConnectionTest.class);
private int port;
+ private volatile boolean queue = false;
+ private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
protected void setUp() throws Exception
{
@@ -51,10 +56,11 @@ public class ConnectionTest extends TestCase
port = AvailablePortFinder.getNextAvailable(12000);
ConnectionDelegate server = new ServerDelegate() {
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- super.connectionOpen(conn, open);
- conn.close();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(ConnectionTest.this);
+ return ssn;
}
};
@@ -63,6 +69,58 @@ public class ConnectionTest extends TestCase
ioa.start();
}
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ if (queue)
+ {
+ messages.add(xfr);
+ ssn.processed(xfr);
+ return;
+ }
+
+ String body = xfr.getBodyString();
+
+ if (body.startsWith("CLOSE"))
+ {
+ ssn.getConnection().close();
+ }
+ else if (body.startsWith("ECHO"))
+ {
+ int id = xfr.getId();
+ ssn.invoke(xfr);
+ ssn.processed(id);
+ }
+ else if (body.startsWith("SINK"))
+ {
+ ssn.processed(xfr);
+ }
+ else if (body.startsWith("DROP"))
+ {
+ // do nothing
+ }
+ else
+ {
+ throw new IllegalArgumentException
+ ("unrecognized message: " + body);
+ }
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ throw exc;
+ }
+
+ public void closed(Session ssn) {}
+
+ private void send(Session ssn, String msg)
+ {
+ ssn.messageTransfer
+ ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, msg);
+ }
+
private Connection connect(final Condition closed)
{
Connection conn = new Connection();
@@ -89,6 +147,10 @@ public class ConnectionTest extends TestCase
{
Condition closed = new Condition();
Connection conn = connect(closed);
+
+ Session ssn = conn.createSession();
+ send(ssn, "CLOSE");
+
if (!closed.get(3000))
{
fail("never got notified of connection close");
@@ -105,4 +167,88 @@ public class ConnectionTest extends TestCase
}
}
+ public void testResume() throws Exception
+ {
+ Connection conn = new Connection();
+ conn.connect("localhost", port, null, "guest", "guest");
+
+ conn.setConnectionListener(new ConnectionListener()
+ {
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException e)
+ {
+ throw e;
+ }
+ public void closed(Connection conn)
+ {
+ queue = true;
+ conn.connect("localhost", port, null, "guest", "guest");
+ conn.resume();
+ }
+ });
+
+ Session ssn = conn.createSession(1);
+ final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
+ ssn.setSessionListener(new SessionListener()
+ {
+ public void opened(Session s) {}
+ public void exception(Session s, SessionException e) {}
+ public void message(Session s, MessageTransfer xfr)
+ {
+ synchronized (incoming)
+ {
+ incoming.add(xfr);
+ incoming.notifyAll();
+ }
+
+ s.processed(xfr);
+ }
+ public void closed(Session s) {}
+ });
+
+ send(ssn, "SINK 0");
+ send(ssn, "ECHO 1");
+ send(ssn, "ECHO 2");
+
+ ssn.sync();
+
+ String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" };
+ for (String m : msgs)
+ {
+ send(ssn, m);
+ }
+
+ ssn.sync();
+
+ assertEquals(msgs.length, messages.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertEquals(msgs[i], messages.get(i).getBodyString());
+ }
+
+ queue = false;
+
+ send(ssn, "ECHO 8");
+ send(ssn, "ECHO 9");
+
+ synchronized (incoming)
+ {
+ Waiter w = new Waiter(incoming, 30000);
+ while (w.hasTime() && incoming.size() < 4)
+ {
+ w.await();
+ }
+
+ assertEquals(4, incoming.size());
+ assertEquals("ECHO 1", incoming.get(0).getBodyString());
+ assertEquals(0, incoming.get(0).getId());
+ assertEquals("ECHO 2", incoming.get(1).getBodyString());
+ assertEquals(1, incoming.get(1).getId());
+ assertEquals("ECHO 8", incoming.get(2).getBodyString());
+ assertEquals(0, incoming.get(0).getId());
+ assertEquals("ECHO 9", incoming.get(3).getBodyString());
+ assertEquals(1, incoming.get(1).getId());
+ }
+ }
+
}