summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-10 01:49:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-10 01:49:45 +0000
commit55fa9bf941d78a34f90f2ed278afe4d5247abfad (patch)
treeb93f8810835f11945848ec5a395ec46d1686dac9 /java/common/src
parent21a371f3b6717b454d04d676ada015fcf1934dbb (diff)
downloadqpid-python-55fa9bf941d78a34f90f2ed278afe4d5247abfad.tar.gz
Added a Toy Exchange that does same basic routing for direct and topic.
Should be good enough for Arnaud to test atleast the basic JMS functionality. Added a FileMessage to demo Martins requirment. Haven't tested yet. The Toy Broker can now accept subscriptions and transfer messages to clients git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java91
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyExchange.java132
-rw-r--r--java/common/src/main/java/org/apache/qpidity/api/Message.java10
5 files changed, 215 insertions, 46 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
index c387a38b17..b70c8fae18 100644
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/Connection.java
@@ -111,6 +111,8 @@ public class Connection implements ProtocolActions
}
// not sure if this is the right place
+ System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
+
getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
index 537a7ef586..ff89567cee 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
@@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer;
*/
public abstract class ConnectionDelegate extends Delegate<Channel>
{
- private String _username;
- private String _password;
+ private String _username = "guest";
+ private String _password = "guest";;
private String _mechanism;
private String _virtualHost;
private SaslClient saslClient;
@@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
//-----------------------------------------------
@Override public void connectionStart(Channel context, ConnectionStart struct)
{
+ System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n");
System.out.println("The broker has sent connection-start");
String mechanism = null;
@@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
String knownHosts = struct.getKnownHosts();
System.out.println("The broker has opened the connection for use");
System.out.println("The broker supplied the following hosts for failover " + knownHosts);
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
+ if(_negotiationCompleteLock != null)
{
- _negotiationCompleteLock.unlock();
+ _negotiationCompleteLock.lock();
+ try
+ {
+ _negotiationComplete.signalAll();
+ }
+ finally
+ {
+ _negotiationCompleteLock.unlock();
+ }
}
+ System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n");
}
public void connectionRedirect(Channel context, ConnectionRedirect struct)
@@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
@Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
String hosts = "amqp:1223243232325";
- System.out.println("The client has sent connection-open-ok");
+ System.out.println("The client has sent connection-open");
context.connectionOpenOk(hosts);
+ System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n");
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 683008fe8a..4949568bbf 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -20,19 +20,16 @@
*/
package org.apache.qpidity;
-import java.io.IOException;
+import static org.apache.qpidity.Functions.str;
+import java.io.IOException;
import java.nio.ByteBuffer;
-
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import static org.apache.qpidity.Functions.*;
-
/**
* ToyBroker
@@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*;
class ToyBroker extends SessionDelegate
{
- private Map<String,Queue<Message>> queues;
+ private ToyExchange exchange;
private MessageTransfer xfr = null;
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
-
- public ToyBroker(Map<String,Queue<Message>> queues)
+ private Map<String,String> consumers = new HashMap<String,String>();
+
+ public ToyBroker(ToyExchange exchange)
{
- this.queues = queues;
+ this.exchange = exchange;
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
{
- queues.put(qd.getQueue(), new LinkedList());
- System.out.println("declared queue: " + qd.getQueue());
+ exchange.createQueue(qd.getQueue());
+ System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
+ }
+
+ @Override public void queueBind(Session ssn, QueueBind qb)
+ {
+ exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
@@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult(qq.getId(), result);
}
+
+ @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
+ {
+ consumers.put(ms.getDestination(),ms.getQueue());
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n");
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
@@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate
props = (DeliveryProperties) hdr;
}
}
-
- if (props != null && !props.getDiscardUnroutable())
- {
- String dest = xfr.getDestination();
- if (!queues.containsKey(dest))
- {
- reject(ssn);
- }
- }
-
+
this.headers = headers;
}
@@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate
if (frame.isLastSegment() && frame.isLastFrame())
{
String dest = xfr.getDestination();
- Queue queue = queues.get(dest);
- if (queue == null)
+ Message m = new Message(headers, frames);
+
+ if (exchange.route(dest,props.getRoutingKey(),m))
{
- reject(ssn);
+ System.out.println("queued " + m);
+ dispatchMessages(ssn);
}
else
{
- Message m = new Message(headers, frames);
- queue.offer(m);
- System.out.println("queued " + m);
+
+ reject(ssn);
}
ssn.processed(xfr);
xfr = null;
@@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate
ssn.messageReject(ranges, 0, "no such destination");
}
}
+
+ private void transferMessage(Session ssn,String dest, Message m)
+ {
+ System.out.println("\n==================> Transfering message to: " +dest + "\n");
+ ssn.messageTransfer(dest, (short)0, (short)0);
+ ssn.headers(m.headers);
+ for (Frame f : m.frames)
+ {
+ for (ByteBuffer b : f)
+ {
+ ssn.data(b);
+ }
+ }
+ ssn.endData();
+ }
+
+ public void dispatchMessages(Session ssn)
+ {
+ for (String dest: consumers.keySet())
+ {
+ Message m = exchange.getQueue(consumers.get(dest)).poll();
+ if(m != null)
+ {
+ transferMessage(ssn,dest,m);
+ }
+ }
+ }
- private class Message
+ class Message
{
private final Struct[] headers;
private final List<Frame> frames;
@@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- final Map<String,Queue<Message>> queues =
- new HashMap<String,Queue<Message>>();
-
+ final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
{
- return new ToyBroker(queues);
+ return new ToyBroker(exchange);
}
};
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
new file mode 100644
index 0000000000..6fabd22462
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
@@ -0,0 +1,132 @@
+package org.apache.qpidity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.qpidity.ToyBroker.Message;
+
+public class ToyExchange
+{
+ final static String DIRECT = "amq.direct";
+ final static String TOPIC = "amq.topic";
+
+ private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>();
+ private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>();
+ private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>();
+
+ public void createQueue(String name)
+ {
+ queues.put(name, new LinkedList<Message>());
+ }
+
+ public Queue<Message> getQueue(String name)
+ {
+ return queues.get(name);
+ }
+
+ public void bindQueue(String type,String binding,String queueName)
+ {
+ Queue<Message> queue = queues.get(queueName);
+ binding = normalizeKey(binding);
+ if(DIRECT.equals(type))
+ {
+
+ if (directEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = directEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ directEx.put(binding,list);
+ }
+ }
+ else
+ {
+ if (topicEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = topicEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ topicEx.put(binding,list);
+ }
+ }
+ }
+
+ public boolean route(String dest,String routingKey,Message msg)
+ {
+ List<Queue<Message>> queues;
+ if(DIRECT.equals(dest))
+ {
+ queues = directEx.get(routingKey);
+ }
+ else
+ {
+ queues = matchWildCard(routingKey);
+ }
+ if(queues != null && queues.size()>0)
+ {
+ System.out.println("Message stored in " + queues.size() + " queues");
+ storeMessage(msg,queues);
+ return true;
+ }
+ else
+ {
+ System.out.println("Message unroutable " + msg);
+ return false;
+ }
+ }
+
+ private String normalizeKey(String routingKey)
+ {
+ if(routingKey.indexOf(".*")>1)
+ {
+ return routingKey.substring(0,routingKey.indexOf(".*"));
+ }
+ else
+ {
+ return routingKey;
+ }
+ }
+
+ private List<Queue<Message>> matchWildCard(String routingKey)
+ {
+ List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+
+ for(String key: topicEx.keySet())
+ {
+ Pattern p = Pattern.compile(key);
+ Matcher m = p.matcher(routingKey);
+ if (m.find())
+ {
+ for(Queue<Message> queue : topicEx.get(key))
+ {
+ selected.add(queue);
+ }
+ }
+ }
+
+ return selected;
+ }
+
+ private void storeMessage(Message msg,List<Queue<Message>> selected)
+ {
+ for(Queue<Message> queue : selected)
+ {
+ queue.offer(msg);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java
index ccad3577f0..4e4a070fb4 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java
@@ -1,5 +1,6 @@
package org.apache.qpidity.api;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.qpidity.MessageProperties;
@@ -43,16 +44,16 @@ public interface Message
* </ul>
* @param src
*/
- public void appendData(byte[] src);
+ public void appendData(byte[] src) throws IOException;
- public void appendData(ByteBuffer src);
+ public void appendData(ByteBuffer src) throws IOException;
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
*
- * The read function might copy data from a
+ * The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
@@ -60,7 +61,8 @@ public interface Message
* </ul>
* @param target
*/
- public void readData(byte[] target);
+ public void readData(byte[] target) throws IOException;
+ public ByteBuffer readData() throws IOException;
}