diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
| commit | aa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch) | |
| tree | 5643430f399ab05a85d1dfb6859841b3b5fa4543 /java/common/src | |
| parent | 69237a07946e9cf442a9bd86e97956cc7f17a33e (diff) | |
| download | qpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz | |
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
8 files changed, 91 insertions, 37 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 9df40106fc..3c734ba8f4 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler<Frame> method = m; } - System.out.println("sent " + m); + if (m.getEncodedTrack() != Frame.L4) + { + System.out.println("sent control " + m.getClass().getName()); + } } public void headers(Struct ... headers) diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java index 05b252c26e..c6190dc3d7 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java @@ -42,7 +42,7 @@ class CommandDispatcher implements Handler<Event<Session,Method>> Session ssn = event.context; Method method = event.target; method.setId(ssn.nextCommandId()); - System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate); + System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); method.delegate(ssn, delegate); if (!method.hasPayload()) { diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java index 96c6e4d52c..b01f067381 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java @@ -50,7 +50,7 @@ class MethodDecoder<C> implements Handler<Event<C,Segment>> public void handle(Event<C,Segment> event) { - System.out.println("got method segment:\n " + event.target); + //System.out.println("got method segment:\n " + event.target); Iterator<ByteBuffer> fragments = event.target.getFragments(); Decoder dec = new FragmentDecoder(major, minor, fragments); int type = (int) dec.readLong(); diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index 911eaa0b15..f5a040166e 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -40,7 +40,7 @@ class MethodDispatcher<C> implements Handler<Event<C,Method>> public void handle(Event<C,Method> event) { Method method = event.target; - System.out.println("delegating " + method + " to " + delegate); + System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); method.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index d3a24ffd8a..6f0bd5c757 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -104,12 +104,19 @@ public class Session extends Invoker } void flushProcessed() - { + { + for (Range r: processed) + { + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + } + System.out.println("Notifying peer with execution complete"); executionComplete(0, processed); } void syncPoint() { + System.out.println("===========Request received to sync=========================="); + Range range = new Range(0, getCommandsIn() - 1); boolean flush; synchronized (processed) @@ -147,9 +154,11 @@ public class Session extends Invoker for (long id = lower; id <= upper; id++) { commands.remove(id); - } + } + if (commands.isEmpty()) { + System.out.println("\n All outstanding commands are completed !!!! \n"); commands.notifyAll(); } } @@ -167,7 +176,8 @@ public class Session extends Invoker { synchronized (commands) { - commands.put(commandsOut++, m); + System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); + commands.put(commandsOut++, m); } } channel.method(m); @@ -200,6 +210,7 @@ public class Session extends Invoker public void sync() { + System.out.println("calling sync()"); synchronized (commands) { if (!commands.isEmpty()) @@ -210,7 +221,9 @@ public class Session extends Invoker while (!commands.isEmpty()) { try { + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); commands.wait(); + System.out.println("\n============sync() got notified=========================================\n"); } catch (InterruptedException e) { diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 1287e3dc1a..e6c107ced2 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate<Session> { for (Range range : ranges) { - ssn.complete(range.getLower(), range.getUpper()); + System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); } } ssn.complete(excmp.getCumulativeExecutionMark()); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index c57bc7e0e9..0a48a0a990 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; /** @@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate private DeliveryProperties props = null; private Struct[] headers = null; private List<Frame> frames = null; - private Map<String,String> consumers = new HashMap<String,String>(); + private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); public ToyBroker(ToyExchange exchange) { @@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { - consumers.put(ms.getDestination(),ms.getQueue()); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); - } + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList(); + frames = new ArrayList<Frame>(); + System.out.println("received transfer " + xfr.getDestination()); } public void headers(Session ssn, Struct ... headers) @@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate if (hdr instanceof DeliveryProperties) { props = (DeliveryProperties) hdr; + System.out.println("received headers routing_key " + props.getRoutingKey()); } } @@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate } } - private void transferMessage(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); @@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate ssn.endData(); } - public void dispatchMessages(Session ssn) + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) { - Message m = exchange.getQueue(consumers.get(dest)).poll(); - if(m != null) - { - transferMessage(ssn,dest,m); - } + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName); + Message m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); } } @@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate } } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private class Consumer + { + long _credit; + String _queueName; + } public static final void main(String[] args) throws IOException { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java index 6fabd22462..eab5f6c078 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -16,35 +16,35 @@ public class ToyExchange final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>(); - private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>(); - private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>(); + private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); + private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); + private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>(); public void createQueue(String name) { - queues.put(name, new LinkedList<Message>()); + queues.put(name, new LinkedBlockingQueue<Message>()); } - public Queue<Message> getQueue(String name) + public LinkedBlockingQueue<Message> getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - Queue<Message> queue = queues.get(queueName); + LinkedBlockingQueue<Message> queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List<Queue<Message>> list = directEx.get(binding); + List<LinkedBlockingQueue<Message>> list = directEx.get(binding); list.add(queue); } else { - List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); list.add(queue); directEx.put(binding,list); } @@ -53,12 +53,12 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List<Queue<Message>> list = topicEx.get(binding); + List<LinkedBlockingQueue<Message>> list = topicEx.get(binding); list.add(queue); } else { - List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); list.add(queue); topicEx.put(binding,list); } @@ -67,7 +67,7 @@ public class ToyExchange public boolean route(String dest,String routingKey,Message msg) { - List<Queue<Message>> queues; + List<LinkedBlockingQueue<Message>> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +101,9 @@ public class ToyExchange } } - private List<Queue<Message>> matchWildCard(String routingKey) + private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey) { - List<Queue<Message>> selected = new ArrayList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>(); for(String key: topicEx.keySet()) { @@ -111,7 +111,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(Queue<Message> queue : topicEx.get(key)) + for(LinkedBlockingQueue<Message> queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +121,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List<Queue<Message>> selected) + private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected) { - for(Queue<Message> queue : selected) + for(LinkedBlockingQueue<Message> queue : selected) { queue.offer(msg); } |
