summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
commitaa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch)
tree5643430f399ab05a85d1dfb6859841b3b5fa4543 /java/common/src
parent69237a07946e9cf442a9bd86e97956cc7f17a33e (diff)
downloadqpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java19
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java3
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java61
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyExchange.java34
8 files changed, 91 insertions, 37 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index 9df40106fc..3c734ba8f4 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler<Frame>
method = m;
}
- System.out.println("sent " + m);
+ if (m.getEncodedTrack() != Frame.L4)
+ {
+ System.out.println("sent control " + m.getClass().getName());
+ }
}
public void headers(Struct ... headers)
diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
index 05b252c26e..c6190dc3d7 100644
--- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
@@ -42,7 +42,7 @@ class CommandDispatcher implements Handler<Event<Session,Method>>
Session ssn = event.context;
Method method = event.target;
method.setId(ssn.nextCommandId());
- System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate);
+ System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n");
method.delegate(ssn, delegate);
if (!method.hasPayload())
{
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
index 96c6e4d52c..b01f067381 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
@@ -50,7 +50,7 @@ class MethodDecoder<C> implements Handler<Event<C,Segment>>
public void handle(Event<C,Segment> event)
{
- System.out.println("got method segment:\n " + event.target);
+ //System.out.println("got method segment:\n " + event.target);
Iterator<ByteBuffer> fragments = event.target.getFragments();
Decoder dec = new FragmentDecoder(major, minor, fragments);
int type = (int) dec.readLong();
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
index 911eaa0b15..f5a040166e 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
@@ -40,7 +40,7 @@ class MethodDispatcher<C> implements Handler<Event<C,Method>>
public void handle(Event<C,Method> event)
{
Method method = event.target;
- System.out.println("delegating " + method + " to " + delegate);
+ System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n");
method.delegate(event.context, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index d3a24ffd8a..6f0bd5c757 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -104,12 +104,19 @@ public class Session extends Invoker
}
void flushProcessed()
- {
+ {
+ for (Range r: processed)
+ {
+ System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
+ }
+ System.out.println("Notifying peer with execution complete");
executionComplete(0, processed);
}
void syncPoint()
{
+ System.out.println("===========Request received to sync==========================");
+
Range range = new Range(0, getCommandsIn() - 1);
boolean flush;
synchronized (processed)
@@ -147,9 +154,11 @@ public class Session extends Invoker
for (long id = lower; id <= upper; id++)
{
commands.remove(id);
- }
+ }
+
if (commands.isEmpty())
{
+ System.out.println("\n All outstanding commands are completed !!!! \n");
commands.notifyAll();
}
}
@@ -167,7 +176,8 @@ public class Session extends Invoker
{
synchronized (commands)
{
- commands.put(commandsOut++, m);
+ System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut);
+ commands.put(commandsOut++, m);
}
}
channel.method(m);
@@ -200,6 +210,7 @@ public class Session extends Invoker
public void sync()
{
+ System.out.println("calling sync()");
synchronized (commands)
{
if (!commands.isEmpty())
@@ -210,7 +221,9 @@ public class Session extends Invoker
while (!commands.isEmpty())
{
try {
+ System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
commands.wait();
+ System.out.println("\n============sync() got notified=========================================\n");
}
catch (InterruptedException e)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index 1287e3dc1a..e6c107ced2 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate<Session>
{
for (Range range : ranges)
{
- ssn.complete(range.getLower(), range.getUpper());
+ System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper());
+ ssn.complete(range.getLower(), range.getUpper());
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index c57bc7e0e9..0a48a0a990 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
- private Map<String,String> consumers = new HashMap<String,String>();
+ private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
{
@@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
- consumers.put(ms.getDestination(),ms.getQueue());
- System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n");
- }
+ Consumer c = new Consumer();
+ c._queueName = ms.getQueue();
+ consumers.put(ms.getDestination(),c);
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
+ @Override public void messageFlow(Session ssn,MessageFlow struct)
+ {
+ Consumer c = consumers.get(struct.getDestination());
+ c._credit = struct.getValue();
+ System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
+ }
+
+ @Override public void messageFlush(Session ssn,MessageFlush struct)
+ {
+ System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
+ checkAndSendMessagesToConsumer(ssn,struct.getDestination());
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
this.xfr = xfr;
- frames = new ArrayList();
+ frames = new ArrayList<Frame>();
+ System.out.println("received transfer " + xfr.getDestination());
}
public void headers(Session ssn, Struct ... headers)
@@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate
if (hdr instanceof DeliveryProperties)
{
props = (DeliveryProperties) hdr;
+ System.out.println("received headers routing_key " + props.getRoutingKey());
}
}
@@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate
}
}
- private void transferMessage(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
ssn.messageTransfer(dest, (short)0, (short)0);
@@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate
ssn.endData();
}
- public void dispatchMessages(Session ssn)
+ private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
{
- Message m = exchange.getQueue(consumers.get(dest)).poll();
- if(m != null)
- {
- transferMessage(ssn,dest,m);
- }
+ checkAndSendMessagesToConsumer(ssn,dest);
+ }
+ }
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+ {
+ Consumer c = consumers.get(dest);
+ LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
+ Message m = queue.poll();
+ while (m != null && c._credit>0)
+ {
+ transferMessageToPeer(ssn,dest,m);
+ c._credit--;
+ m = queue.poll();
}
}
@@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate
}
}
+
+ // ugly, but who cares :)
+ // assumes unit is always no of messages, not bytes
+ // assumes it's credit mode and not window
+ private class Consumer
+ {
+ long _credit;
+ String _queueName;
+ }
public static final void main(String[] args) throws IOException
{
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
index 6fabd22462..eab5f6c078 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
@@ -5,7 +5,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -16,35 +16,35 @@ public class ToyExchange
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>();
- private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>();
- private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedList<Message>());
+ queues.put(name, new LinkedBlockingQueue<Message>());
}
- public Queue<Message> getQueue(String name)
+ public LinkedBlockingQueue<Message> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- Queue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<Message> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<Queue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,12 +53,12 @@ public class ToyExchange
{
if (topicEx.containsKey(binding))
{
- List<Queue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
topicEx.put(binding,list);
}
@@ -67,7 +67,7 @@ public class ToyExchange
public boolean route(String dest,String routingKey,Message msg)
{
- List<Queue<Message>> queues;
+ List<LinkedBlockingQueue<Message>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +101,9 @@ public class ToyExchange
}
}
- private List<Queue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
{
- List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +111,7 @@ public class ToyExchange
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(Queue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +121,9 @@ public class ToyExchange
return selected;
}
- private void storeMessage(Message msg,List<Queue<Message>> selected)
+ private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
{
- for(Queue<Message> queue : selected)
+ for(LinkedBlockingQueue<Message> queue : selected)
{
queue.offer(msg);
}