From 55fa9bf941d78a34f90f2ed278afe4d5247abfad Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 10 Aug 2007 01:49:45 +0000 Subject: Added a Toy Exchange that does same basic routing for direct and topic. Should be good enough for Arnaud to test atleast the basic JMS functionality. Added a FileMessage to demo Martins requirment. Haven't tested yet. The Toy Broker can now accept subscriptions and transfer messages to clients git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpidity/Connection.java | 2 + .../org/apache/qpidity/ConnectionDelegate.java | 26 ++-- .../main/java/org/apache/qpidity/ToyBroker.java | 91 +++++++++----- .../main/java/org/apache/qpidity/ToyExchange.java | 132 +++++++++++++++++++++ .../main/java/org/apache/qpidity/api/Message.java | 10 +- 5 files changed, 215 insertions(+), 46 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpidity/ToyExchange.java (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index c387a38b17..b70c8fae18 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -111,6 +111,8 @@ public class Connection implements ProtocolActions } // not sure if this is the right place + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 537a7ef586..ff89567cee 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer; */ public abstract class ConnectionDelegate extends Delegate { - private String _username; - private String _password; + private String _username = "guest"; + private String _password = "guest";; private String _mechanism; private String _virtualHost; private SaslClient saslClient; @@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate //----------------------------------------------- @Override public void connectionStart(Channel context, ConnectionStart struct) { + System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); System.out.println("The broker has sent connection-start"); String mechanism = null; @@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate String knownHosts = struct.getKnownHosts(); System.out.println("The broker has opened the connection for use"); System.out.println("The broker supplied the following hosts for failover " + knownHosts); - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally + if(_negotiationCompleteLock != null) { - _negotiationCompleteLock.unlock(); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } } + System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); } public void connectionRedirect(Channel context, ConnectionRedirect struct) @@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate @Override public void connectionOpen(Channel context, ConnectionOpen struct) { String hosts = "amqp:1223243232325"; - System.out.println("The client has sent connection-open-ok"); + System.out.println("The client has sent connection-open"); context.connectionOpenOk(hosts); + System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 683008fe8a..4949568bbf 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,19 +20,16 @@ */ package org.apache.qpidity; -import java.io.IOException; +import static org.apache.qpidity.Functions.str; +import java.io.IOException; import java.nio.ByteBuffer; - import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import static org.apache.qpidity.Functions.*; - /** * ToyBroker @@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*; class ToyBroker extends SessionDelegate { - private Map> queues; + private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; private Struct[] headers = null; private List frames = null; - - public ToyBroker(Map> queues) + private Map consumers = new HashMap(); + + public ToyBroker(ToyExchange exchange) { - this.queues = queues; + this.exchange = exchange; } @Override public void queueDeclare(Session ssn, QueueDeclare qd) { - queues.put(qd.getQueue(), new LinkedList()); - System.out.println("declared queue: " + qd.getQueue()); + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void queueBind(Session ssn, QueueBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) @@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + consumers.put(ms.getDestination(),ms.getQueue()); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { @@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate props = (DeliveryProperties) hdr; } } - - if (props != null && !props.getDiscardUnroutable()) - { - String dest = xfr.getDestination(); - if (!queues.containsKey(dest)) - { - reject(ssn); - } - } - + this.headers = headers; } @@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate if (frame.isLastSegment() && frame.isLastFrame()) { String dest = xfr.getDestination(); - Queue queue = queues.get(dest); - if (queue == null) + Message m = new Message(headers, frames); + + if (exchange.route(dest,props.getRoutingKey(),m)) { - reject(ssn); + System.out.println("queued " + m); + dispatchMessages(ssn); } else { - Message m = new Message(headers, frames); - queue.offer(m); - System.out.println("queued " + m); + + reject(ssn); } ssn.processed(xfr); xfr = null; @@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } + + private void transferMessage(Session ssn,String dest, Message m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(dest, (short)0, (short)0); + ssn.headers(m.headers); + for (Frame f : m.frames) + { + for (ByteBuffer b : f) + { + ssn.data(b); + } + } + ssn.endData(); + } + + public void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + Message m = exchange.getQueue(consumers.get(dest)).poll(); + if(m != null) + { + transferMessage(ssn,dest,m); + } + } + } - private class Message + class Message { private final Struct[] headers; private final List frames; @@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final Map> queues = - new HashMap>(); - + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { - return new ToyBroker(queues); + return new ToyBroker(exchange); } }; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java new file mode 100644 index 0000000000..6fabd22462 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -0,0 +1,132 @@ +package org.apache.qpidity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.qpidity.ToyBroker.Message; + +public class ToyExchange +{ + final static String DIRECT = "amq.direct"; + final static String TOPIC = "amq.topic"; + + private Map>> directEx = new HashMap>>(); + private Map>> topicEx = new HashMap>>(); + private Map> queues = new HashMap>(); + + public void createQueue(String name) + { + queues.put(name, new LinkedList()); + } + + public Queue getQueue(String name) + { + return queues.get(name); + } + + public void bindQueue(String type,String binding,String queueName) + { + Queue queue = queues.get(queueName); + binding = normalizeKey(binding); + if(DIRECT.equals(type)) + { + + if (directEx.containsKey(binding)) + { + List> list = directEx.get(binding); + list.add(queue); + } + else + { + List> list = new LinkedList>(); + list.add(queue); + directEx.put(binding,list); + } + } + else + { + if (topicEx.containsKey(binding)) + { + List> list = topicEx.get(binding); + list.add(queue); + } + else + { + List> list = new LinkedList>(); + list.add(queue); + topicEx.put(binding,list); + } + } + } + + public boolean route(String dest,String routingKey,Message msg) + { + List> queues; + if(DIRECT.equals(dest)) + { + queues = directEx.get(routingKey); + } + else + { + queues = matchWildCard(routingKey); + } + if(queues != null && queues.size()>0) + { + System.out.println("Message stored in " + queues.size() + " queues"); + storeMessage(msg,queues); + return true; + } + else + { + System.out.println("Message unroutable " + msg); + return false; + } + } + + private String normalizeKey(String routingKey) + { + if(routingKey.indexOf(".*")>1) + { + return routingKey.substring(0,routingKey.indexOf(".*")); + } + else + { + return routingKey; + } + } + + private List> matchWildCard(String routingKey) + { + List> selected = new ArrayList>(); + + for(String key: topicEx.keySet()) + { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(routingKey); + if (m.find()) + { + for(Queue queue : topicEx.get(key)) + { + selected.add(queue); + } + } + } + + return selected; + } + + private void storeMessage(Message msg,List> selected) + { + for(Queue queue : selected) + { + queue.offer(msg); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index ccad3577f0..4e4a070fb4 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -1,5 +1,6 @@ package org.apache.qpidity.api; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpidity.MessageProperties; @@ -43,16 +44,16 @@ public interface Message * * @param src */ - public void appendData(byte[] src); + public void appendData(byte[] src) throws IOException; - public void appendData(ByteBuffer src); + public void appendData(ByteBuffer src) throws IOException; /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) * - * The read function might copy data from a + * The read function might copy data from *
    *
  • From memory (Ex: ByteBuffer) *
  • From Disk @@ -60,7 +61,8 @@ public interface Message *
* @param target */ - public void readData(byte[] target); + public void readData(byte[] target) throws IOException; + public ByteBuffer readData() throws IOException; } -- cgit v1.2.1