From aa0c3083fcb4ddb93d3ef39452005715673f1c8d Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 24 Aug 2007 22:26:42 +0000 Subject: Added basic test case to test JMS git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpidity/client/JMSTestCase.java | 34 ++++++++++++ .../qpidity/client/util/ByteBufferMessage.java | 51 +++++++++++------- .../apache/qpidity/jms/MessageConsumerImpl.java | 16 ++++-- .../src/main/java/org/apache/qpidity/Channel.java | 5 +- .../java/org/apache/qpidity/CommandDispatcher.java | 2 +- .../java/org/apache/qpidity/MethodDecoder.java | 2 +- .../java/org/apache/qpidity/MethodDispatcher.java | 2 +- .../src/main/java/org/apache/qpidity/Session.java | 19 +++++-- .../java/org/apache/qpidity/SessionDelegate.java | 3 +- .../main/java/org/apache/qpidity/ToyBroker.java | 61 +++++++++++++++++----- .../main/java/org/apache/qpidity/ToyExchange.java | 34 ++++++------ 11 files changed, 170 insertions(+), 59 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java new file mode 100644 index 0000000000..20a0542bf6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java @@ -0,0 +1,34 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.jms.ConnectionFactoryImpl; +import org.apache.qpidity.jms.TopicImpl; + +public class JMSTestCase +{ + public static void main(String[] args) + { + try + { + javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection(); + con.start(); + + javax.jms.Session ssn = con.createSession(false, 1); + + javax.jms.Destination dest = new TopicImpl("myTopic"); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + javax.jms.MessageConsumer cons = ssn.createConsumer(dest); + + javax.jms.BytesMessage msg = ssn.createBytesMessage(); + msg.writeInt(123); + prod.send(msg); + + javax.jms.Message m = cons.receive(); + System.out.println(m); + + } + catch(Exception e) + { + e.printStackTrace(); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index ed3209c1d0..218c6cd018 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -87,22 +87,12 @@ public class ByteBufferMessage implements Message public void readData(byte[] target) throws IOException { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); + getReadBuffer().get(target); } public ByteBuffer readData() throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - return _readBuffer; + { + return getReadBuffer(); } private void buildReadBuffer() @@ -122,16 +112,39 @@ public class ByteBufferMessage implements Message } } + private ByteBuffer getReadBuffer() throws IOException + { + if (_readBuffer != null ) + { + return _readBuffer.slice(); + } + else + { + if (_data.size() >0) + { + buildReadBuffer(); + return _readBuffer.slice(); + } + else + { + throw new IOException("No Data to read"); + } + } + } + //hack for testing @Override public String toString() { - if (_data.size() >0 && _readBuffer == null) + try + { + ByteBuffer temp = getReadBuffer(); + byte[] b = new byte[temp.remaining()]; + temp.get(b); + return new String(b); + } + catch(IOException e) { - buildReadBuffer(); + return "No data"; } - ByteBuffer temp = _readBuffer.duplicate(); - byte[] b = new byte[temp.remaining()]; - temp.get(b); - return new String(b); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index a3c7f6c94b..b8718d687c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -368,7 +368,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // if this consumer is stopped then this will be call when starting requestOneMessage(); //When sync() returns we know whether we have received a message or not. + System.out.println("Internal receive -- Called sync()"); getSession().getQpidSession().sync(); + System.out.println("Internal receive -- Returned from sync()"); } if (_messageReceived.get() && timeout < 0) { @@ -492,26 +494,32 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param message The message delivered to this consumer. */ protected synchronized void onMessage(QpidMessage message) - { + { try { // if there is a message selector then we need to evaluate it. boolean messageOk = true; if (_messageSelector != null) { - messageOk = _filter.matches((Message) message); + messageOk = _filter.matches((Message) message); } + + System.out.println("Received a message- onMessage in message consumer Impl"); if (!messageOk && _preAcquire) { // this is the case for topics // We need to ack this message + System.out.println("onMessage - trying to ack message"); acknowledgeMessage(message); + System.out.println("onMessage - acked message"); } // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { + System.out.println("onMessage - trying to acquire message"); messageOk = acquireMessage(message); + System.out.println("onMessage - acquired message"); } // if this consumer is synchronous then set the current message and @@ -520,15 +528,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("Received a message- onMessage in message consumer Impl"); + _logger.debug("Received a message- onMessage in message consumer Impl"); } synchronized (_incomingMessageLock) { + System.out.println("got incomming message lock"); if (messageOk) { // we have received a proper message that we can deliver if (_isReceiving) { + System.out.println("Is receiving true, setting message and notifying"); _incomingMessage = message; _incomingMessageLock.notify(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 9df40106fc..3c734ba8f4 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler method = m; } - System.out.println("sent " + m); + if (m.getEncodedTrack() != Frame.L4) + { + System.out.println("sent control " + m.getClass().getName()); + } } public void headers(Struct ... headers) diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java index 05b252c26e..c6190dc3d7 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java @@ -42,7 +42,7 @@ class CommandDispatcher implements Handler> Session ssn = event.context; Method method = event.target; method.setId(ssn.nextCommandId()); - System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate); + System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); method.delegate(ssn, delegate); if (!method.hasPayload()) { diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java index 96c6e4d52c..b01f067381 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java @@ -50,7 +50,7 @@ class MethodDecoder implements Handler> public void handle(Event event) { - System.out.println("got method segment:\n " + event.target); + //System.out.println("got method segment:\n " + event.target); Iterator fragments = event.target.getFragments(); Decoder dec = new FragmentDecoder(major, minor, fragments); int type = (int) dec.readLong(); diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index 911eaa0b15..f5a040166e 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -40,7 +40,7 @@ class MethodDispatcher implements Handler> public void handle(Event event) { Method method = event.target; - System.out.println("delegating " + method + " to " + delegate); + System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); method.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index d3a24ffd8a..6f0bd5c757 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -104,12 +104,19 @@ public class Session extends Invoker } void flushProcessed() - { + { + for (Range r: processed) + { + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + } + System.out.println("Notifying peer with execution complete"); executionComplete(0, processed); } void syncPoint() { + System.out.println("===========Request received to sync=========================="); + Range range = new Range(0, getCommandsIn() - 1); boolean flush; synchronized (processed) @@ -147,9 +154,11 @@ public class Session extends Invoker for (long id = lower; id <= upper; id++) { commands.remove(id); - } + } + if (commands.isEmpty()) { + System.out.println("\n All outstanding commands are completed !!!! \n"); commands.notifyAll(); } } @@ -167,7 +176,8 @@ public class Session extends Invoker { synchronized (commands) { - commands.put(commandsOut++, m); + System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); + commands.put(commandsOut++, m); } } channel.method(m); @@ -200,6 +210,7 @@ public class Session extends Invoker public void sync() { + System.out.println("calling sync()"); synchronized (commands) { if (!commands.isEmpty()) @@ -210,7 +221,9 @@ public class Session extends Invoker while (!commands.isEmpty()) { try { + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); commands.wait(); + System.out.println("\n============sync() got notified=========================================\n"); } catch (InterruptedException e) { diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 1287e3dc1a..e6c107ced2 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate { for (Range range : ranges) { - ssn.complete(range.getLower(), range.getUpper()); + System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); } } ssn.complete(excmp.getCumulativeExecutionMark()); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index c57bc7e0e9..0a48a0a990 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; /** @@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate private DeliveryProperties props = null; private Struct[] headers = null; private List frames = null; - private Map consumers = new HashMap(); + private Map consumers = new ConcurrentHashMap(); public ToyBroker(ToyExchange exchange) { @@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { - consumers.put(ms.getDestination(),ms.getQueue()); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); - } + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList(); + frames = new ArrayList(); + System.out.println("received transfer " + xfr.getDestination()); } public void headers(Session ssn, Struct ... headers) @@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate if (hdr instanceof DeliveryProperties) { props = (DeliveryProperties) hdr; + System.out.println("received headers routing_key " + props.getRoutingKey()); } } @@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate } } - private void transferMessage(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); @@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate ssn.endData(); } - public void dispatchMessages(Session ssn) + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) { - Message m = exchange.getQueue(consumers.get(dest)).poll(); - if(m != null) - { - transferMessage(ssn,dest,m); - } + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue queue = exchange.getQueue(c._queueName); + Message m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); } } @@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate } } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private class Consumer + { + long _credit; + String _queueName; + } public static final void main(String[] args) throws IOException { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java index 6fabd22462..eab5f6c078 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -16,35 +16,35 @@ public class ToyExchange final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map>> directEx = new HashMap>>(); - private Map>> topicEx = new HashMap>>(); - private Map> queues = new HashMap>(); + private Map>> directEx = new HashMap>>(); + private Map>> topicEx = new HashMap>>(); + private Map> queues = new HashMap>(); public void createQueue(String name) { - queues.put(name, new LinkedList()); + queues.put(name, new LinkedBlockingQueue()); } - public Queue getQueue(String name) + public LinkedBlockingQueue getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - Queue queue = queues.get(queueName); + LinkedBlockingQueue queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List> list = directEx.get(binding); + List> list = directEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); directEx.put(binding,list); } @@ -53,12 +53,12 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List> list = topicEx.get(binding); + List> list = topicEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); topicEx.put(binding,list); } @@ -67,7 +67,7 @@ public class ToyExchange public boolean route(String dest,String routingKey,Message msg) { - List> queues; + List> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +101,9 @@ public class ToyExchange } } - private List> matchWildCard(String routingKey) + private List> matchWildCard(String routingKey) { - List> selected = new ArrayList>(); + List> selected = new ArrayList>(); for(String key: topicEx.keySet()) { @@ -111,7 +111,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(Queue queue : topicEx.get(key)) + for(LinkedBlockingQueue queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +121,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List> selected) + private void storeMessage(Message msg,List> selected) { - for(Queue queue : selected) + for(LinkedBlockingQueue queue : selected) { queue.offer(msg); } -- cgit v1.2.1