From aa0c3083fcb4ddb93d3ef39452005715673f1c8d Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 24 Aug 2007 22:26:42 +0000 Subject: Added basic test case to test JMS git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpidity/Channel.java | 5 +- .../java/org/apache/qpidity/CommandDispatcher.java | 2 +- .../java/org/apache/qpidity/MethodDecoder.java | 2 +- .../java/org/apache/qpidity/MethodDispatcher.java | 2 +- .../src/main/java/org/apache/qpidity/Session.java | 19 +++++-- .../java/org/apache/qpidity/SessionDelegate.java | 3 +- .../main/java/org/apache/qpidity/ToyBroker.java | 61 +++++++++++++++++----- .../main/java/org/apache/qpidity/ToyExchange.java | 34 ++++++------ 8 files changed, 91 insertions(+), 37 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 9df40106fc..3c734ba8f4 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler method = m; } - System.out.println("sent " + m); + if (m.getEncodedTrack() != Frame.L4) + { + System.out.println("sent control " + m.getClass().getName()); + } } public void headers(Struct ... headers) diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java index 05b252c26e..c6190dc3d7 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java @@ -42,7 +42,7 @@ class CommandDispatcher implements Handler> Session ssn = event.context; Method method = event.target; method.setId(ssn.nextCommandId()); - System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate); + System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); method.delegate(ssn, delegate); if (!method.hasPayload()) { diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java index 96c6e4d52c..b01f067381 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java @@ -50,7 +50,7 @@ class MethodDecoder implements Handler> public void handle(Event event) { - System.out.println("got method segment:\n " + event.target); + //System.out.println("got method segment:\n " + event.target); Iterator fragments = event.target.getFragments(); Decoder dec = new FragmentDecoder(major, minor, fragments); int type = (int) dec.readLong(); diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index 911eaa0b15..f5a040166e 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -40,7 +40,7 @@ class MethodDispatcher implements Handler> public void handle(Event event) { Method method = event.target; - System.out.println("delegating " + method + " to " + delegate); + System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); method.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index d3a24ffd8a..6f0bd5c757 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -104,12 +104,19 @@ public class Session extends Invoker } void flushProcessed() - { + { + for (Range r: processed) + { + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + } + System.out.println("Notifying peer with execution complete"); executionComplete(0, processed); } void syncPoint() { + System.out.println("===========Request received to sync=========================="); + Range range = new Range(0, getCommandsIn() - 1); boolean flush; synchronized (processed) @@ -147,9 +154,11 @@ public class Session extends Invoker for (long id = lower; id <= upper; id++) { commands.remove(id); - } + } + if (commands.isEmpty()) { + System.out.println("\n All outstanding commands are completed !!!! \n"); commands.notifyAll(); } } @@ -167,7 +176,8 @@ public class Session extends Invoker { synchronized (commands) { - commands.put(commandsOut++, m); + System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); + commands.put(commandsOut++, m); } } channel.method(m); @@ -200,6 +210,7 @@ public class Session extends Invoker public void sync() { + System.out.println("calling sync()"); synchronized (commands) { if (!commands.isEmpty()) @@ -210,7 +221,9 @@ public class Session extends Invoker while (!commands.isEmpty()) { try { + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); commands.wait(); + System.out.println("\n============sync() got notified=========================================\n"); } catch (InterruptedException e) { diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 1287e3dc1a..e6c107ced2 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate { for (Range range : ranges) { - ssn.complete(range.getLower(), range.getUpper()); + System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); } } ssn.complete(excmp.getCumulativeExecutionMark()); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index c57bc7e0e9..0a48a0a990 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; /** @@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate private DeliveryProperties props = null; private Struct[] headers = null; private List frames = null; - private Map consumers = new HashMap(); + private Map consumers = new ConcurrentHashMap(); public ToyBroker(ToyExchange exchange) { @@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { - consumers.put(ms.getDestination(),ms.getQueue()); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); - } + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList(); + frames = new ArrayList(); + System.out.println("received transfer " + xfr.getDestination()); } public void headers(Session ssn, Struct ... headers) @@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate if (hdr instanceof DeliveryProperties) { props = (DeliveryProperties) hdr; + System.out.println("received headers routing_key " + props.getRoutingKey()); } } @@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate } } - private void transferMessage(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); @@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate ssn.endData(); } - public void dispatchMessages(Session ssn) + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) { - Message m = exchange.getQueue(consumers.get(dest)).poll(); - if(m != null) - { - transferMessage(ssn,dest,m); - } + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue queue = exchange.getQueue(c._queueName); + Message m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); } } @@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate } } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private class Consumer + { + long _credit; + String _queueName; + } public static final void main(String[] args) throws IOException { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java index 6fabd22462..eab5f6c078 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -16,35 +16,35 @@ public class ToyExchange final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map>> directEx = new HashMap>>(); - private Map>> topicEx = new HashMap>>(); - private Map> queues = new HashMap>(); + private Map>> directEx = new HashMap>>(); + private Map>> topicEx = new HashMap>>(); + private Map> queues = new HashMap>(); public void createQueue(String name) { - queues.put(name, new LinkedList()); + queues.put(name, new LinkedBlockingQueue()); } - public Queue getQueue(String name) + public LinkedBlockingQueue getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - Queue queue = queues.get(queueName); + LinkedBlockingQueue queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List> list = directEx.get(binding); + List> list = directEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); directEx.put(binding,list); } @@ -53,12 +53,12 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List> list = topicEx.get(binding); + List> list = topicEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); topicEx.put(binding,list); } @@ -67,7 +67,7 @@ public class ToyExchange public boolean route(String dest,String routingKey,Message msg) { - List> queues; + List> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +101,9 @@ public class ToyExchange } } - private List> matchWildCard(String routingKey) + private List> matchWildCard(String routingKey) { - List> selected = new ArrayList>(); + List> selected = new ArrayList>(); for(String key: topicEx.keySet()) { @@ -111,7 +111,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(Queue queue : topicEx.get(key)) + for(LinkedBlockingQueue queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +121,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List> selected) + private void storeMessage(Message msg,List> selected) { - for(Queue queue : selected) + for(LinkedBlockingQueue queue : selected) { queue.offer(msg); } -- cgit v1.2.1