diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
| commit | aa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch) | |
| tree | 5643430f399ab05a85d1dfb6859841b3b5fa4543 /java/client | |
| parent | 69237a07946e9cf442a9bd86e97956cc7f17a33e (diff) | |
| download | qpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz | |
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
3 files changed, 79 insertions, 22 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java new file mode 100644 index 0000000000..20a0542bf6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java @@ -0,0 +1,34 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.jms.ConnectionFactoryImpl; +import org.apache.qpidity.jms.TopicImpl; + +public class JMSTestCase +{ + public static void main(String[] args) + { + try + { + javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection(); + con.start(); + + javax.jms.Session ssn = con.createSession(false, 1); + + javax.jms.Destination dest = new TopicImpl("myTopic"); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + javax.jms.MessageConsumer cons = ssn.createConsumer(dest); + + javax.jms.BytesMessage msg = ssn.createBytesMessage(); + msg.writeInt(123); + prod.send(msg); + + javax.jms.Message m = cons.receive(); + System.out.println(m); + + } + catch(Exception e) + { + e.printStackTrace(); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index ed3209c1d0..218c6cd018 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -87,22 +87,12 @@ public class ByteBufferMessage implements Message public void readData(byte[] target) throws IOException { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); + getReadBuffer().get(target); } public ByteBuffer readData() throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - return _readBuffer; + { + return getReadBuffer(); } private void buildReadBuffer() @@ -122,16 +112,39 @@ public class ByteBufferMessage implements Message } } + private ByteBuffer getReadBuffer() throws IOException + { + if (_readBuffer != null ) + { + return _readBuffer.slice(); + } + else + { + if (_data.size() >0) + { + buildReadBuffer(); + return _readBuffer.slice(); + } + else + { + throw new IOException("No Data to read"); + } + } + } + //hack for testing @Override public String toString() { - if (_data.size() >0 && _readBuffer == null) + try + { + ByteBuffer temp = getReadBuffer(); + byte[] b = new byte[temp.remaining()]; + temp.get(b); + return new String(b); + } + catch(IOException e) { - buildReadBuffer(); + return "No data"; } - ByteBuffer temp = _readBuffer.duplicate(); - byte[] b = new byte[temp.remaining()]; - temp.get(b); - return new String(b); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index a3c7f6c94b..b8718d687c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -368,7 +368,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // if this consumer is stopped then this will be call when starting requestOneMessage(); //When sync() returns we know whether we have received a message or not. + System.out.println("Internal receive -- Called sync()"); getSession().getQpidSession().sync(); + System.out.println("Internal receive -- Returned from sync()"); } if (_messageReceived.get() && timeout < 0) { @@ -492,26 +494,32 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param message The message delivered to this consumer. */ protected synchronized void onMessage(QpidMessage message) - { + { try { // if there is a message selector then we need to evaluate it. boolean messageOk = true; if (_messageSelector != null) { - messageOk = _filter.matches((Message) message); + messageOk = _filter.matches((Message) message); } + + System.out.println("Received a message- onMessage in message consumer Impl"); if (!messageOk && _preAcquire) { // this is the case for topics // We need to ack this message + System.out.println("onMessage - trying to ack message"); acknowledgeMessage(message); + System.out.println("onMessage - acked message"); } // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { + System.out.println("onMessage - trying to acquire message"); messageOk = acquireMessage(message); + System.out.println("onMessage - acquired message"); } // if this consumer is synchronous then set the current message and @@ -520,15 +528,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("Received a message- onMessage in message consumer Impl"); + _logger.debug("Received a message- onMessage in message consumer Impl"); } synchronized (_incomingMessageLock) { + System.out.println("got incomming message lock"); if (messageOk) { // we have received a proper message that we can deliver if (_isReceiving) { + System.out.println("Is receiving true, setting message and notifying"); _incomingMessage = message; _incomingMessageLock.notify(); } |
