diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-10 01:49:45 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-10 01:49:45 +0000 |
| commit | 55fa9bf941d78a34f90f2ed278afe4d5247abfad (patch) | |
| tree | b93f8810835f11945848ec5a395ec46d1686dac9 /java/common | |
| parent | 21a371f3b6717b454d04d676ada015fcf1934dbb (diff) | |
| download | qpid-python-55fa9bf941d78a34f90f2ed278afe4d5247abfad.tar.gz | |
Added a Toy Exchange that does same basic routing for direct and topic.
Should be good enough for Arnaud to test atleast the basic JMS functionality.
Added a FileMessage to demo Martins requirment. Haven't tested yet.
The Toy Broker can now accept subscriptions and transfer messages to clients
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
5 files changed, 215 insertions, 46 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index c387a38b17..b70c8fae18 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -111,6 +111,8 @@ public class Connection implements ProtocolActions } // not sure if this is the right place + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 537a7ef586..ff89567cee 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer; */ public abstract class ConnectionDelegate extends Delegate<Channel> { - private String _username; - private String _password; + private String _username = "guest"; + private String _password = "guest";; private String _mechanism; private String _virtualHost; private SaslClient saslClient; @@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate<Channel> //----------------------------------------------- @Override public void connectionStart(Channel context, ConnectionStart struct) { + System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); System.out.println("The broker has sent connection-start"); String mechanism = null; @@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate<Channel> String knownHosts = struct.getKnownHosts(); System.out.println("The broker has opened the connection for use"); System.out.println("The broker supplied the following hosts for failover " + knownHosts); - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally + if(_negotiationCompleteLock != null) { - _negotiationCompleteLock.unlock(); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } } + System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); } public void connectionRedirect(Channel context, ConnectionRedirect struct) @@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate<Channel> @Override public void connectionOpen(Channel context, ConnectionOpen struct) { String hosts = "amqp:1223243232325"; - System.out.println("The client has sent connection-open-ok"); + System.out.println("The client has sent connection-open"); context.connectionOpenOk(hosts); + System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 683008fe8a..4949568bbf 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,19 +20,16 @@ */ package org.apache.qpidity; -import java.io.IOException; +import static org.apache.qpidity.Functions.str; +import java.io.IOException; import java.nio.ByteBuffer; - import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import static org.apache.qpidity.Functions.*; - /** * ToyBroker @@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*; class ToyBroker extends SessionDelegate { - private Map<String,Queue<Message>> queues; + private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; private Struct[] headers = null; private List<Frame> frames = null; - - public ToyBroker(Map<String,Queue<Message>> queues) + private Map<String,String> consumers = new HashMap<String,String>(); + + public ToyBroker(ToyExchange exchange) { - this.queues = queues; + this.exchange = exchange; } @Override public void queueDeclare(Session ssn, QueueDeclare qd) { - queues.put(qd.getQueue(), new LinkedList()); - System.out.println("declared queue: " + qd.getQueue()); + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void queueBind(Session ssn, QueueBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) @@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + consumers.put(ms.getDestination(),ms.getQueue()); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { @@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate props = (DeliveryProperties) hdr; } } - - if (props != null && !props.getDiscardUnroutable()) - { - String dest = xfr.getDestination(); - if (!queues.containsKey(dest)) - { - reject(ssn); - } - } - + this.headers = headers; } @@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate if (frame.isLastSegment() && frame.isLastFrame()) { String dest = xfr.getDestination(); - Queue queue = queues.get(dest); - if (queue == null) + Message m = new Message(headers, frames); + + if (exchange.route(dest,props.getRoutingKey(),m)) { - reject(ssn); + System.out.println("queued " + m); + dispatchMessages(ssn); } else { - Message m = new Message(headers, frames); - queue.offer(m); - System.out.println("queued " + m); + + reject(ssn); } ssn.processed(xfr); xfr = null; @@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } + + private void transferMessage(Session ssn,String dest, Message m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(dest, (short)0, (short)0); + ssn.headers(m.headers); + for (Frame f : m.frames) + { + for (ByteBuffer b : f) + { + ssn.data(b); + } + } + ssn.endData(); + } + + public void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + Message m = exchange.getQueue(consumers.get(dest)).poll(); + if(m != null) + { + transferMessage(ssn,dest,m); + } + } + } - private class Message + class Message { private final Struct[] headers; private final List<Frame> frames; @@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final Map<String,Queue<Message>> queues = - new HashMap<String,Queue<Message>>(); - + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { - return new ToyBroker(queues); + return new ToyBroker(exchange); } }; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java new file mode 100644 index 0000000000..6fabd22462 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -0,0 +1,132 @@ +package org.apache.qpidity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.qpidity.ToyBroker.Message; + +public class ToyExchange +{ + final static String DIRECT = "amq.direct"; + final static String TOPIC = "amq.topic"; + + private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>(); + private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>(); + private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>(); + + public void createQueue(String name) + { + queues.put(name, new LinkedList<Message>()); + } + + public Queue<Message> getQueue(String name) + { + return queues.get(name); + } + + public void bindQueue(String type,String binding,String queueName) + { + Queue<Message> queue = queues.get(queueName); + binding = normalizeKey(binding); + if(DIRECT.equals(type)) + { + + if (directEx.containsKey(binding)) + { + List<Queue<Message>> list = directEx.get(binding); + list.add(queue); + } + else + { + List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + list.add(queue); + directEx.put(binding,list); + } + } + else + { + if (topicEx.containsKey(binding)) + { + List<Queue<Message>> list = topicEx.get(binding); + list.add(queue); + } + else + { + List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + list.add(queue); + topicEx.put(binding,list); + } + } + } + + public boolean route(String dest,String routingKey,Message msg) + { + List<Queue<Message>> queues; + if(DIRECT.equals(dest)) + { + queues = directEx.get(routingKey); + } + else + { + queues = matchWildCard(routingKey); + } + if(queues != null && queues.size()>0) + { + System.out.println("Message stored in " + queues.size() + " queues"); + storeMessage(msg,queues); + return true; + } + else + { + System.out.println("Message unroutable " + msg); + return false; + } + } + + private String normalizeKey(String routingKey) + { + if(routingKey.indexOf(".*")>1) + { + return routingKey.substring(0,routingKey.indexOf(".*")); + } + else + { + return routingKey; + } + } + + private List<Queue<Message>> matchWildCard(String routingKey) + { + List<Queue<Message>> selected = new ArrayList<Queue<Message>>(); + + for(String key: topicEx.keySet()) + { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(routingKey); + if (m.find()) + { + for(Queue<Message> queue : topicEx.get(key)) + { + selected.add(queue); + } + } + } + + return selected; + } + + private void storeMessage(Message msg,List<Queue<Message>> selected) + { + for(Queue<Message> queue : selected) + { + queue.offer(msg); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index ccad3577f0..4e4a070fb4 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -1,5 +1,6 @@ package org.apache.qpidity.api; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpidity.MessageProperties; @@ -43,16 +44,16 @@ public interface Message * </ul> * @param src */ - public void appendData(byte[] src); + public void appendData(byte[] src) throws IOException; - public void appendData(ByteBuffer src); + public void appendData(ByteBuffer src) throws IOException; /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) * - * The read function might copy data from a + * The read function might copy data from * <ul> * <li> From memory (Ex: ByteBuffer) * <li> From Disk @@ -60,7 +61,8 @@ public interface Message * </ul> * @param target */ - public void readData(byte[] target); + public void readData(byte[] target) throws IOException; + public ByteBuffer readData() throws IOException; } |
