summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
commitaa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch)
tree5643430f399ab05a85d1dfb6859841b3b5fa4543 /java/client
parent69237a07946e9cf442a9bd86e97956cc7f17a33e (diff)
downloadqpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java34
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java51
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java16
3 files changed, 79 insertions, 22 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
new file mode 100644
index 0000000000..20a0542bf6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.jms.ConnectionFactoryImpl;
+import org.apache.qpidity.jms.TopicImpl;
+
+public class JMSTestCase
+{
+ public static void main(String[] args)
+ {
+ try
+ {
+ javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection();
+ con.start();
+
+ javax.jms.Session ssn = con.createSession(false, 1);
+
+ javax.jms.Destination dest = new TopicImpl("myTopic");
+ javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
+
+ javax.jms.BytesMessage msg = ssn.createBytesMessage();
+ msg.writeInt(123);
+ prod.send(msg);
+
+ javax.jms.Message m = cons.receive();
+ System.out.println(m);
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index ed3209c1d0..218c6cd018 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -87,22 +87,12 @@ public class ByteBufferMessage implements Message
public void readData(byte[] target) throws IOException
{
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- _readBuffer.get(target);
+ getReadBuffer().get(target);
}
public ByteBuffer readData() throws IOException
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- return _readBuffer;
+ {
+ return getReadBuffer();
}
private void buildReadBuffer()
@@ -122,16 +112,39 @@ public class ByteBufferMessage implements Message
}
}
+ private ByteBuffer getReadBuffer() throws IOException
+ {
+ if (_readBuffer != null )
+ {
+ return _readBuffer.slice();
+ }
+ else
+ {
+ if (_data.size() >0)
+ {
+ buildReadBuffer();
+ return _readBuffer.slice();
+ }
+ else
+ {
+ throw new IOException("No Data to read");
+ }
+ }
+ }
+
//hack for testing
@Override public String toString()
{
- if (_data.size() >0 && _readBuffer == null)
+ try
+ {
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
+ }
+ catch(IOException e)
{
- buildReadBuffer();
+ return "No data";
}
- ByteBuffer temp = _readBuffer.duplicate();
- byte[] b = new byte[temp.remaining()];
- temp.get(b);
- return new String(b);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index a3c7f6c94b..b8718d687c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -368,7 +368,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// if this consumer is stopped then this will be call when starting
requestOneMessage();
//When sync() returns we know whether we have received a message or not.
+ System.out.println("Internal receive -- Called sync()");
getSession().getQpidSession().sync();
+ System.out.println("Internal receive -- Returned from sync()");
}
if (_messageReceived.get() && timeout < 0)
{
@@ -492,26 +494,32 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param message The message delivered to this consumer.
*/
protected synchronized void onMessage(QpidMessage message)
- {
+ {
try
{
// if there is a message selector then we need to evaluate it.
boolean messageOk = true;
if (_messageSelector != null)
{
- messageOk = _filter.matches((Message) message);
+ messageOk = _filter.matches((Message) message);
}
+
+ System.out.println("Received a message- onMessage in message consumer Impl");
if (!messageOk && _preAcquire)
{
// this is the case for topics
// We need to ack this message
+ System.out.println("onMessage - trying to ack message");
acknowledgeMessage(message);
+ System.out.println("onMessage - acked message");
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk)
{
+ System.out.println("onMessage - trying to acquire message");
messageOk = acquireMessage(message);
+ System.out.println("onMessage - acquired message");
}
// if this consumer is synchronous then set the current message and
@@ -520,15 +528,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Received a message- onMessage in message consumer Impl");
+ _logger.debug("Received a message- onMessage in message consumer Impl");
}
synchronized (_incomingMessageLock)
{
+ System.out.println("got incomming message lock");
if (messageOk)
{
// we have received a proper message that we can deliver
if (_isReceiving)
{
+ System.out.println("Is receiving true, setting message and notifying");
_incomingMessage = message;
_incomingMessageLock.notify();
}