diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
| commit | 3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2 (patch) | |
| tree | 7082cdbe3e0833f29de69423ec9f0b99eb44aa61 /qpid/java/common/src/test | |
| parent | 386fc2f8dc103ae078c98e3fe5bcdfb7842f27de (diff) | |
| download | qpid-python-3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2.tar.gz | |
QPID-1339: support for low level session resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/test')
| -rw-r--r-- | qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java | 154 |
1 files changed, 150 insertions, 4 deletions
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 03fae56250..19e1c2537f 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -28,21 +28,26 @@ import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoAcceptor; import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import junit.framework.TestCase; +import java.util.ArrayList; +import java.util.List; import java.util.Random; /** * ConnectionTest */ -public class ConnectionTest extends TestCase +public class ConnectionTest extends TestCase implements SessionListener { private static final Logger log = Logger.get(ConnectionTest.class); private int port; + private volatile boolean queue = false; + private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); protected void setUp() throws Exception { @@ -51,10 +56,11 @@ public class ConnectionTest extends TestCase port = AvailablePortFinder.getNextAvailable(12000); ConnectionDelegate server = new ServerDelegate() { - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override public Session getSession(Connection conn, SessionAttach atc) { - super.connectionOpen(conn, open); - conn.close(); + Session ssn = super.getSession(conn, atc); + ssn.setSessionListener(ConnectionTest.this); + return ssn; } }; @@ -63,6 +69,58 @@ public class ConnectionTest extends TestCase ioa.start(); } + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + if (queue) + { + messages.add(xfr); + ssn.processed(xfr); + return; + } + + String body = xfr.getBodyString(); + + if (body.startsWith("CLOSE")) + { + ssn.getConnection().close(); + } + else if (body.startsWith("ECHO")) + { + int id = xfr.getId(); + ssn.invoke(xfr); + ssn.processed(id); + } + else if (body.startsWith("SINK")) + { + ssn.processed(xfr); + } + else if (body.startsWith("DROP")) + { + // do nothing + } + else + { + throw new IllegalArgumentException + ("unrecognized message: " + body); + } + } + + public void exception(Session ssn, SessionException exc) + { + throw exc; + } + + public void closed(Session ssn) {} + + private void send(Session ssn, String msg) + { + ssn.messageTransfer + ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, msg); + } + private Connection connect(final Condition closed) { Connection conn = new Connection(); @@ -89,6 +147,10 @@ public class ConnectionTest extends TestCase { Condition closed = new Condition(); Connection conn = connect(closed); + + Session ssn = conn.createSession(); + send(ssn, "CLOSE"); + if (!closed.get(3000)) { fail("never got notified of connection close"); @@ -105,4 +167,88 @@ public class ConnectionTest extends TestCase } } + public void testResume() throws Exception + { + Connection conn = new Connection(); + conn.connect("localhost", port, null, "guest", "guest"); + + conn.setConnectionListener(new ConnectionListener() + { + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException e) + { + throw e; + } + public void closed(Connection conn) + { + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + }); + + Session ssn = conn.createSession(1); + final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); + ssn.setSessionListener(new SessionListener() + { + public void opened(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); + } + + s.processed(xfr); + } + public void closed(Session s) {} + }); + + send(ssn, "SINK 0"); + send(ssn, "ECHO 1"); + send(ssn, "ECHO 2"); + + ssn.sync(); + + String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" }; + for (String m : msgs) + { + send(ssn, m); + } + + ssn.sync(); + + assertEquals(msgs.length, messages.size()); + for (int i = 0; i < msgs.length; i++) + { + assertEquals(msgs[i], messages.get(i).getBodyString()); + } + + queue = false; + + send(ssn, "ECHO 8"); + send(ssn, "ECHO 9"); + + synchronized (incoming) + { + Waiter w = new Waiter(incoming, 30000); + while (w.hasTime() && incoming.size() < 4) + { + w.await(); + } + + assertEquals(4, incoming.size()); + assertEquals("ECHO 1", incoming.get(0).getBodyString()); + assertEquals(0, incoming.get(0).getId()); + assertEquals("ECHO 2", incoming.get(1).getBodyString()); + assertEquals(1, incoming.get(1).getId()); + assertEquals("ECHO 8", incoming.get(2).getBodyString()); + assertEquals(0, incoming.get(0).getId()); + assertEquals("ECHO 9", incoming.get(3).getBodyString()); + assertEquals(1, incoming.get(1).getId()); + } + } + } |
