summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-24 22:26:42 +0000
commitaa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch)
tree5643430f399ab05a85d1dfb6859841b3b5fa4543 /java
parent69237a07946e9cf442a9bd86e97956cc7f17a33e (diff)
downloadqpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java34
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java51
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java16
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java19
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java3
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java61
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyExchange.java34
11 files changed, 170 insertions, 59 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
new file mode 100644
index 0000000000..20a0542bf6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.jms.ConnectionFactoryImpl;
+import org.apache.qpidity.jms.TopicImpl;
+
+public class JMSTestCase
+{
+ public static void main(String[] args)
+ {
+ try
+ {
+ javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection();
+ con.start();
+
+ javax.jms.Session ssn = con.createSession(false, 1);
+
+ javax.jms.Destination dest = new TopicImpl("myTopic");
+ javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
+
+ javax.jms.BytesMessage msg = ssn.createBytesMessage();
+ msg.writeInt(123);
+ prod.send(msg);
+
+ javax.jms.Message m = cons.receive();
+ System.out.println(m);
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index ed3209c1d0..218c6cd018 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -87,22 +87,12 @@ public class ByteBufferMessage implements Message
public void readData(byte[] target) throws IOException
{
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- _readBuffer.get(target);
+ getReadBuffer().get(target);
}
public ByteBuffer readData() throws IOException
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- return _readBuffer;
+ {
+ return getReadBuffer();
}
private void buildReadBuffer()
@@ -122,16 +112,39 @@ public class ByteBufferMessage implements Message
}
}
+ private ByteBuffer getReadBuffer() throws IOException
+ {
+ if (_readBuffer != null )
+ {
+ return _readBuffer.slice();
+ }
+ else
+ {
+ if (_data.size() >0)
+ {
+ buildReadBuffer();
+ return _readBuffer.slice();
+ }
+ else
+ {
+ throw new IOException("No Data to read");
+ }
+ }
+ }
+
//hack for testing
@Override public String toString()
{
- if (_data.size() >0 && _readBuffer == null)
+ try
+ {
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
+ }
+ catch(IOException e)
{
- buildReadBuffer();
+ return "No data";
}
- ByteBuffer temp = _readBuffer.duplicate();
- byte[] b = new byte[temp.remaining()];
- temp.get(b);
- return new String(b);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index a3c7f6c94b..b8718d687c 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -368,7 +368,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// if this consumer is stopped then this will be call when starting
requestOneMessage();
//When sync() returns we know whether we have received a message or not.
+ System.out.println("Internal receive -- Called sync()");
getSession().getQpidSession().sync();
+ System.out.println("Internal receive -- Returned from sync()");
}
if (_messageReceived.get() && timeout < 0)
{
@@ -492,26 +494,32 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* @param message The message delivered to this consumer.
*/
protected synchronized void onMessage(QpidMessage message)
- {
+ {
try
{
// if there is a message selector then we need to evaluate it.
boolean messageOk = true;
if (_messageSelector != null)
{
- messageOk = _filter.matches((Message) message);
+ messageOk = _filter.matches((Message) message);
}
+
+ System.out.println("Received a message- onMessage in message consumer Impl");
if (!messageOk && _preAcquire)
{
// this is the case for topics
// We need to ack this message
+ System.out.println("onMessage - trying to ack message");
acknowledgeMessage(message);
+ System.out.println("onMessage - acked message");
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk)
{
+ System.out.println("onMessage - trying to acquire message");
messageOk = acquireMessage(message);
+ System.out.println("onMessage - acquired message");
}
// if this consumer is synchronous then set the current message and
@@ -520,15 +528,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Received a message- onMessage in message consumer Impl");
+ _logger.debug("Received a message- onMessage in message consumer Impl");
}
synchronized (_incomingMessageLock)
{
+ System.out.println("got incomming message lock");
if (messageOk)
{
// we have received a proper message that we can deliver
if (_isReceiving)
{
+ System.out.println("Is receiving true, setting message and notifying");
_incomingMessage = message;
_incomingMessageLock.notify();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index 9df40106fc..3c734ba8f4 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler<Frame>
method = m;
}
- System.out.println("sent " + m);
+ if (m.getEncodedTrack() != Frame.L4)
+ {
+ System.out.println("sent control " + m.getClass().getName());
+ }
}
public void headers(Struct ... headers)
diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
index 05b252c26e..c6190dc3d7 100644
--- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
@@ -42,7 +42,7 @@ class CommandDispatcher implements Handler<Event<Session,Method>>
Session ssn = event.context;
Method method = event.target;
method.setId(ssn.nextCommandId());
- System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate);
+ System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n");
method.delegate(ssn, delegate);
if (!method.hasPayload())
{
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
index 96c6e4d52c..b01f067381 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
@@ -50,7 +50,7 @@ class MethodDecoder<C> implements Handler<Event<C,Segment>>
public void handle(Event<C,Segment> event)
{
- System.out.println("got method segment:\n " + event.target);
+ //System.out.println("got method segment:\n " + event.target);
Iterator<ByteBuffer> fragments = event.target.getFragments();
Decoder dec = new FragmentDecoder(major, minor, fragments);
int type = (int) dec.readLong();
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
index 911eaa0b15..f5a040166e 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
@@ -40,7 +40,7 @@ class MethodDispatcher<C> implements Handler<Event<C,Method>>
public void handle(Event<C,Method> event)
{
Method method = event.target;
- System.out.println("delegating " + method + " to " + delegate);
+ System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n");
method.delegate(event.context, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index d3a24ffd8a..6f0bd5c757 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -104,12 +104,19 @@ public class Session extends Invoker
}
void flushProcessed()
- {
+ {
+ for (Range r: processed)
+ {
+ System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
+ }
+ System.out.println("Notifying peer with execution complete");
executionComplete(0, processed);
}
void syncPoint()
{
+ System.out.println("===========Request received to sync==========================");
+
Range range = new Range(0, getCommandsIn() - 1);
boolean flush;
synchronized (processed)
@@ -147,9 +154,11 @@ public class Session extends Invoker
for (long id = lower; id <= upper; id++)
{
commands.remove(id);
- }
+ }
+
if (commands.isEmpty())
{
+ System.out.println("\n All outstanding commands are completed !!!! \n");
commands.notifyAll();
}
}
@@ -167,7 +176,8 @@ public class Session extends Invoker
{
synchronized (commands)
{
- commands.put(commandsOut++, m);
+ System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut);
+ commands.put(commandsOut++, m);
}
}
channel.method(m);
@@ -200,6 +210,7 @@ public class Session extends Invoker
public void sync()
{
+ System.out.println("calling sync()");
synchronized (commands)
{
if (!commands.isEmpty())
@@ -210,7 +221,9 @@ public class Session extends Invoker
while (!commands.isEmpty())
{
try {
+ System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
commands.wait();
+ System.out.println("\n============sync() got notified=========================================\n");
}
catch (InterruptedException e)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index 1287e3dc1a..e6c107ced2 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate<Session>
{
for (Range range : ranges)
{
- ssn.complete(range.getLower(), range.getUpper());
+ System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper());
+ ssn.complete(range.getLower(), range.getUpper());
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index c57bc7e0e9..0a48a0a990 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
- private Map<String,String> consumers = new HashMap<String,String>();
+ private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
{
@@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
- consumers.put(ms.getDestination(),ms.getQueue());
- System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n");
- }
+ Consumer c = new Consumer();
+ c._queueName = ms.getQueue();
+ consumers.put(ms.getDestination(),c);
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
+ @Override public void messageFlow(Session ssn,MessageFlow struct)
+ {
+ Consumer c = consumers.get(struct.getDestination());
+ c._credit = struct.getValue();
+ System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
+ }
+
+ @Override public void messageFlush(Session ssn,MessageFlush struct)
+ {
+ System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
+ checkAndSendMessagesToConsumer(ssn,struct.getDestination());
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
this.xfr = xfr;
- frames = new ArrayList();
+ frames = new ArrayList<Frame>();
+ System.out.println("received transfer " + xfr.getDestination());
}
public void headers(Session ssn, Struct ... headers)
@@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate
if (hdr instanceof DeliveryProperties)
{
props = (DeliveryProperties) hdr;
+ System.out.println("received headers routing_key " + props.getRoutingKey());
}
}
@@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate
}
}
- private void transferMessage(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
ssn.messageTransfer(dest, (short)0, (short)0);
@@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate
ssn.endData();
}
- public void dispatchMessages(Session ssn)
+ private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
{
- Message m = exchange.getQueue(consumers.get(dest)).poll();
- if(m != null)
- {
- transferMessage(ssn,dest,m);
- }
+ checkAndSendMessagesToConsumer(ssn,dest);
+ }
+ }
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+ {
+ Consumer c = consumers.get(dest);
+ LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
+ Message m = queue.poll();
+ while (m != null && c._credit>0)
+ {
+ transferMessageToPeer(ssn,dest,m);
+ c._credit--;
+ m = queue.poll();
}
}
@@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate
}
}
+
+ // ugly, but who cares :)
+ // assumes unit is always no of messages, not bytes
+ // assumes it's credit mode and not window
+ private class Consumer
+ {
+ long _credit;
+ String _queueName;
+ }
public static final void main(String[] args) throws IOException
{
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
index 6fabd22462..eab5f6c078 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
@@ -5,7 +5,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -16,35 +16,35 @@ public class ToyExchange
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>();
- private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>();
- private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedList<Message>());
+ queues.put(name, new LinkedBlockingQueue<Message>());
}
- public Queue<Message> getQueue(String name)
+ public LinkedBlockingQueue<Message> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- Queue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<Message> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<Queue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,12 +53,12 @@ public class ToyExchange
{
if (topicEx.containsKey(binding))
{
- List<Queue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
topicEx.put(binding,list);
}
@@ -67,7 +67,7 @@ public class ToyExchange
public boolean route(String dest,String routingKey,Message msg)
{
- List<Queue<Message>> queues;
+ List<LinkedBlockingQueue<Message>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +101,9 @@ public class ToyExchange
}
}
- private List<Queue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
{
- List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +111,7 @@ public class ToyExchange
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(Queue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +121,9 @@ public class ToyExchange
return selected;
}
- private void storeMessage(Message msg,List<Queue<Message>> selected)
+ private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
{
- for(Queue<Message> queue : selected)
+ for(LinkedBlockingQueue<Message> queue : selected)
{
queue.offer(msg);
}