diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-24 22:26:42 +0000 |
| commit | aa0c3083fcb4ddb93d3ef39452005715673f1c8d (patch) | |
| tree | 5643430f399ab05a85d1dfb6859841b3b5fa4543 /java | |
| parent | 69237a07946e9cf442a9bd86e97956cc7f17a33e (diff) | |
| download | qpid-python-aa0c3083fcb4ddb93d3ef39452005715673f1c8d.tar.gz | |
Added basic test case to test JMS
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
11 files changed, 170 insertions, 59 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java new file mode 100644 index 0000000000..20a0542bf6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java @@ -0,0 +1,34 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.jms.ConnectionFactoryImpl; +import org.apache.qpidity.jms.TopicImpl; + +public class JMSTestCase +{ + public static void main(String[] args) + { + try + { + javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection(); + con.start(); + + javax.jms.Session ssn = con.createSession(false, 1); + + javax.jms.Destination dest = new TopicImpl("myTopic"); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + javax.jms.MessageConsumer cons = ssn.createConsumer(dest); + + javax.jms.BytesMessage msg = ssn.createBytesMessage(); + msg.writeInt(123); + prod.send(msg); + + javax.jms.Message m = cons.receive(); + System.out.println(m); + + } + catch(Exception e) + { + e.printStackTrace(); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index ed3209c1d0..218c6cd018 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -87,22 +87,12 @@ public class ByteBufferMessage implements Message public void readData(byte[] target) throws IOException { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); + getReadBuffer().get(target); } public ByteBuffer readData() throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - return _readBuffer; + { + return getReadBuffer(); } private void buildReadBuffer() @@ -122,16 +112,39 @@ public class ByteBufferMessage implements Message } } + private ByteBuffer getReadBuffer() throws IOException + { + if (_readBuffer != null ) + { + return _readBuffer.slice(); + } + else + { + if (_data.size() >0) + { + buildReadBuffer(); + return _readBuffer.slice(); + } + else + { + throw new IOException("No Data to read"); + } + } + } + //hack for testing @Override public String toString() { - if (_data.size() >0 && _readBuffer == null) + try + { + ByteBuffer temp = getReadBuffer(); + byte[] b = new byte[temp.remaining()]; + temp.get(b); + return new String(b); + } + catch(IOException e) { - buildReadBuffer(); + return "No data"; } - ByteBuffer temp = _readBuffer.duplicate(); - byte[] b = new byte[temp.remaining()]; - temp.get(b); - return new String(b); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index a3c7f6c94b..b8718d687c 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -368,7 +368,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // if this consumer is stopped then this will be call when starting requestOneMessage(); //When sync() returns we know whether we have received a message or not. + System.out.println("Internal receive -- Called sync()"); getSession().getQpidSession().sync(); + System.out.println("Internal receive -- Returned from sync()"); } if (_messageReceived.get() && timeout < 0) { @@ -492,26 +494,32 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * @param message The message delivered to this consumer. */ protected synchronized void onMessage(QpidMessage message) - { + { try { // if there is a message selector then we need to evaluate it. boolean messageOk = true; if (_messageSelector != null) { - messageOk = _filter.matches((Message) message); + messageOk = _filter.matches((Message) message); } + + System.out.println("Received a message- onMessage in message consumer Impl"); if (!messageOk && _preAcquire) { // this is the case for topics // We need to ack this message + System.out.println("onMessage - trying to ack message"); acknowledgeMessage(message); + System.out.println("onMessage - acked message"); } // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { + System.out.println("onMessage - trying to acquire message"); messageOk = acquireMessage(message); + System.out.println("onMessage - acquired message"); } // if this consumer is synchronous then set the current message and @@ -520,15 +528,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("Received a message- onMessage in message consumer Impl"); + _logger.debug("Received a message- onMessage in message consumer Impl"); } synchronized (_incomingMessageLock) { + System.out.println("got incomming message lock"); if (messageOk) { // we have received a proper message that we can deliver if (_isReceiving) { + System.out.println("Is receiving true, setting message and notifying"); _incomingMessage = message; _incomingMessageLock.notify(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 9df40106fc..3c734ba8f4 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -140,7 +140,10 @@ public class Channel extends Invoker implements Handler<Frame> method = m; } - System.out.println("sent " + m); + if (m.getEncodedTrack() != Frame.L4) + { + System.out.println("sent control " + m.getClass().getName()); + } } public void headers(Struct ... headers) diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java index 05b252c26e..c6190dc3d7 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java @@ -42,7 +42,7 @@ class CommandDispatcher implements Handler<Event<Session,Method>> Session ssn = event.context; Method method = event.target; method.setId(ssn.nextCommandId()); - System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate); + System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); method.delegate(ssn, delegate); if (!method.hasPayload()) { diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java index 96c6e4d52c..b01f067381 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java @@ -50,7 +50,7 @@ class MethodDecoder<C> implements Handler<Event<C,Segment>> public void handle(Event<C,Segment> event) { - System.out.println("got method segment:\n " + event.target); + //System.out.println("got method segment:\n " + event.target); Iterator<ByteBuffer> fragments = event.target.getFragments(); Decoder dec = new FragmentDecoder(major, minor, fragments); int type = (int) dec.readLong(); diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index 911eaa0b15..f5a040166e 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -40,7 +40,7 @@ class MethodDispatcher<C> implements Handler<Event<C,Method>> public void handle(Event<C,Method> event) { Method method = event.target; - System.out.println("delegating " + method + " to " + delegate); + System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); method.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index d3a24ffd8a..6f0bd5c757 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -104,12 +104,19 @@ public class Session extends Invoker } void flushProcessed() - { + { + for (Range r: processed) + { + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + } + System.out.println("Notifying peer with execution complete"); executionComplete(0, processed); } void syncPoint() { + System.out.println("===========Request received to sync=========================="); + Range range = new Range(0, getCommandsIn() - 1); boolean flush; synchronized (processed) @@ -147,9 +154,11 @@ public class Session extends Invoker for (long id = lower; id <= upper; id++) { commands.remove(id); - } + } + if (commands.isEmpty()) { + System.out.println("\n All outstanding commands are completed !!!! \n"); commands.notifyAll(); } } @@ -167,7 +176,8 @@ public class Session extends Invoker { synchronized (commands) { - commands.put(commandsOut++, m); + System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); + commands.put(commandsOut++, m); } } channel.method(m); @@ -200,6 +210,7 @@ public class Session extends Invoker public void sync() { + System.out.println("calling sync()"); synchronized (commands) { if (!commands.isEmpty()) @@ -210,7 +221,9 @@ public class Session extends Invoker while (!commands.isEmpty()) { try { + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); commands.wait(); + System.out.println("\n============sync() got notified=========================================\n"); } catch (InterruptedException e) { diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 1287e3dc1a..e6c107ced2 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -46,7 +46,8 @@ public abstract class SessionDelegate extends Delegate<Session> { for (Range range : ranges) { - ssn.complete(range.getLower(), range.getUpper()); + System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); } } ssn.complete(excmp.getCumulativeExecutionMark()); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index c57bc7e0e9..0a48a0a990 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; /** @@ -44,7 +46,7 @@ class ToyBroker extends SessionDelegate private DeliveryProperties props = null; private Struct[] headers = null; private List<Frame> frames = null; - private Map<String,String> consumers = new HashMap<String,String>(); + private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); public ToyBroker(ToyExchange exchange) { @@ -71,14 +73,30 @@ class ToyBroker extends SessionDelegate @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { - consumers.put(ms.getDestination(),ms.getQueue()); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); - } + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList(); + frames = new ArrayList<Frame>(); + System.out.println("received transfer " + xfr.getDestination()); } public void headers(Session ssn, Struct ... headers) @@ -95,6 +113,7 @@ class ToyBroker extends SessionDelegate if (hdr instanceof DeliveryProperties) { props = (DeliveryProperties) hdr; + System.out.println("received headers routing_key " + props.getRoutingKey()); } } @@ -147,7 +166,7 @@ class ToyBroker extends SessionDelegate } } - private void transferMessage(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); @@ -162,15 +181,24 @@ class ToyBroker extends SessionDelegate ssn.endData(); } - public void dispatchMessages(Session ssn) + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) { - Message m = exchange.getQueue(consumers.get(dest)).poll(); - if(m != null) - { - transferMessage(ssn,dest,m); - } + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName); + Message m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); } } @@ -213,6 +241,15 @@ class ToyBroker extends SessionDelegate } } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private class Consumer + { + long _credit; + String _queueName; + } public static final void main(String[] args) throws IOException { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java index 6fabd22462..eab5f6c078 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -16,35 +16,35 @@ public class ToyExchange final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>(); - private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>(); - private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>(); + private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); + private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); + private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>(); public void createQueue(String name) { - queues.put(name, new LinkedList<Message>()); + queues.put(name, new LinkedBlockingQueue<Message>()); } - public Queue<Message> getQueue(String name) + public LinkedBlockingQueue<Message> getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - Queue<Message> queue = queues.get(queueName); + LinkedBlockingQueue<Message> queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List<Queue<Message>> list = directEx.get(binding); + List<LinkedBlockingQueue<Message>> list = directEx.get(binding); list.add(queue); } else { - List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); list.add(queue); directEx.put(binding,list); } @@ -53,12 +53,12 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List<Queue<Message>> list = topicEx.get(binding); + List<LinkedBlockingQueue<Message>> list = topicEx.get(binding); list.add(queue); } else { - List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); list.add(queue); topicEx.put(binding,list); } @@ -67,7 +67,7 @@ public class ToyExchange public boolean route(String dest,String routingKey,Message msg) { - List<Queue<Message>> queues; + List<LinkedBlockingQueue<Message>> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +101,9 @@ public class ToyExchange } } - private List<Queue<Message>> matchWildCard(String routingKey) + private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey) { - List<Queue<Message>> selected = new ArrayList<Queue<Message>>(); + List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>(); for(String key: topicEx.keySet()) { @@ -111,7 +111,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(Queue<Message> queue : topicEx.get(key)) + for(LinkedBlockingQueue<Message> queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +121,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List<Queue<Message>> selected) + private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected) { - for(Queue<Message> queue : selected) + for(LinkedBlockingQueue<Message> queue : selected) { queue.offer(msg); } |
