From 55fa9bf941d78a34f90f2ed278afe4d5247abfad Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 10 Aug 2007 01:49:45 +0000 Subject: Added a Toy Exchange that does same basic routing for direct and topic. Should be good enough for Arnaud to test atleast the basic JMS functionality. Added a FileMessage to demo Martins requirment. Haven't tested yet. The Toy Broker can now accept subscriptions and transfer messages to clients git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Client.java | 108 +++++++++++++++ .../org/apache/qpidity/client/ClientSession.java | 99 ++++++++++++++ .../qpidity/client/ClientSessionDelegate.java | 78 +++++++++++ .../java/org/apache/qpidity/client/Connection.java | 4 +- .../java/org/apache/qpidity/client/DemoClient.java | 84 ++++++++++++ .../java/org/apache/qpidity/client/Session.java | 8 -- .../apache/qpidity/client/util/FileMessage.java | 99 ++++++++++++++ .../client/util/MessagePartListenerAdapter.java | 149 +++++++++++++++++++++ .../main/java/org/apache/qpidity/impl/Client.java | 112 ---------------- .../org/apache/qpidity/impl/ClientSession.java | 101 -------------- .../apache/qpidity/impl/ClientSessionDelegate.java | 79 ----------- .../java/org/apache/qpidity/impl/DemoClient.java | 45 ------- .../qpidity/impl/MessagePartListenerAdapter.java | 109 --------------- .../apache/qpidity/jms/MessageConsumerImpl.java | 2 +- .../org/apache/qpidity/jms/QueueBrowserImpl.java | 2 +- .../main/java/org/apache/qpidity/Connection.java | 2 + .../org/apache/qpidity/ConnectionDelegate.java | 26 ++-- .../main/java/org/apache/qpidity/ToyBroker.java | 91 ++++++++----- .../main/java/org/apache/qpidity/ToyExchange.java | 132 ++++++++++++++++++ .../main/java/org/apache/qpidity/api/Message.java | 10 +- 20 files changed, 837 insertions(+), 503 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/client/Client.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSession.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/DemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/Client.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java create mode 100644 java/common/src/main/java/org/apache/qpidity/ToyExchange.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java new file mode 100644 index 0000000000..8465475282 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -0,0 +1,108 @@ +package org.apache.qpidity.client; + +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java new file mode 100644 index 0000000000..fcde60fa04 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java @@ -0,0 +1,99 @@ +package org.apache.qpidity.client; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.api.Message; + +/** + * Implements a Qpid Sesion. + */ +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session +{ + private Map _messageListeners = new HashMap(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map> _unackedMessages = new HashMap>(); + + @Override public void sessionClose() + { + // release all unacked messages and then issues a close + super.sessionClose(); + } + + public void messageAcknowledge(RangeSet ranges) + { + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } + } + + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) + { + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); + } + + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) + { + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() + } + + + public RangeSet getAccquiredMessages() + { + return _acquiredMessages; + } + + public RangeSet getRejectedMessages() + { + return _rejectedMessages; + } + + public void setMessageListener(String destination, MessagePartListener listener) + { + _messageListeners.put(destination, listener); + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) + { + _acquiredMessages = acquiredMessages; + } + + void setRejectedMessages(RangeSet rejectedMessages) + { + _rejectedMessages = rejectedMessages; + } + + void notifyException(QpidException ex) + { + _exceptionListner.onException(ex); + } + + Map getMessageListerners() + { + return _messageListeners; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java new file mode 100644 index 0000000000..975dcb6d8b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java @@ -0,0 +1,78 @@ +package org.apache.qpidity.client; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; + + +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) + { + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + + } + + @Override public void headers(Session ssn, Struct... headers) + { + _currentMessageListener.messageHeaders(headers); + } + + + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 2ea6db8943..9bc17b14a6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -20,6 +20,7 @@ package org.apache.qpidity.client; import java.net.URL; +import java.util.UUID; import org.apache.qpidity.QpidException; @@ -46,7 +47,7 @@ public interface Connection * @throws QpidException If the communication layer fails to connect with the broker. */ public void connect(URL url) throws QpidException; - + /** * Close this connection. * @@ -83,5 +84,6 @@ public interface Connection * * @param exceptionListner The execptionListener */ + public void setExceptionListener(ExceptionListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java new file mode 100644 index 0000000000..e46065e0a0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java @@ -0,0 +1,84 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class DemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + // queue + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.data("this is the data"); + ssn.endData(); + + //reject + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.endData(); + ssn.sync(); + + // topic subs + ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.sync(); + + ssn.queueDeclare("topic1", null, null); + ssn.queueBind("topic1", "amq.topic", "stock.*",null); + ssn.queueDeclare("topic2", null, null); + ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); + ssn.queueDeclare("topic3", null, null); + ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); + ssn.sync(); + + // topic + ssn.messageTransfer("amq.topic", (short) 0, (short) 1); + ssn.data("Topic message"); + ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 84de268232..09595c8d0b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -20,7 +20,6 @@ package org.apache.qpidity.client; import java.nio.ByteBuffer; import java.util.Map; -import java.util.UUID; import org.apache.qpidity.Option; import org.apache.qpidity.RangeSet; @@ -67,13 +66,6 @@ public interface Session */ public void sessionSuspend(); - /** - * This will resume an existing session - *

Upon resume the session is attached with an underlying channel - * hence making operation on this session available. - */ - public void sessionResume(UUID sessionId); - //------------------------------------------------------ // Messaging methods // Producer diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java new file mode 100644 index 0000000000..84f18dcca4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -0,0 +1,99 @@ +package org.apache.qpidity.client.util; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +/** + * FileMessage provides pull style semantics for + * larges messages backed by a disk. + * Instead of loading all data into memeory it uses + * FileChannel to map regions of the file into memeory + * at a time. + * + * The write methods are not supported. + * + * From the standpoint of performance it is generally + * only worth mapping relatively large files into memory. + * + * FileMessage msg = new FileMessage(in,delProps,msgProps); + * session.messageTransfer(dest,msg,0,0); + * + * The messageTransfer method will read the file in chunks + * and stream it. + * + */ +public class FileMessage implements Message +{ + private MessageProperties _messageProperties; + private DeliveryProperties _deliveryProperties; + private FileChannel _fileChannel; + private int _chunkSize; + private long _fileSize; + private long _pos = 0; + + public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _fileChannel = in.getChannel(); + _chunkSize = chunkSize; + _fileSize = _fileChannel.size(); + + if (_fileSize <= _chunkSize) + { + _chunkSize = (int)_fileSize; + } + } + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + + public void readData(byte[] target) throws IOException + { + int readLen = target.length <= _chunkSize ? target.length : _chunkSize; + if (_pos + readLen > _fileSize) + { + readLen = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen); + _pos += readLen; + bb.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_pos + _chunkSize > _fileSize) + { + _chunkSize = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + _pos += _chunkSize; + return bb; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..4ff83db939 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -0,0 +1,149 @@ +package org.apache.qpidity.client.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.MessagePartListener; + +/** + * This is a simple message assembler. + * Will call onMessage method of the adaptee + * when all message data is read. + * + * This is a good convinience utility for handling + * small messages + */ +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + + // temp solution. + _currentMsg = new Message() + { + Queue _data = new LinkedList(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) throws IOException + { + appendData(ByteBuffer.wrap(src)); + } + + public void appendData(ByteBuffer src) throws IOException + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } + + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + return _readBuffer; + } + + private void buildReadBuffer() + { + //optimize for the simple cases + if(_data.size() == 1) + { + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + } + + //hack for testing + @Override public String toString() + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + byte[] b = new byte[_readBuffer.limit()]; + _readBuffer.get(b); + return new String(b); + } + }; + } + + public void addData(ByteBuffer src) + { + try + { + _currentMsg.appendData(src); + } + catch(IOException e) + { + // A chance for IO exception + // doesn't occur as we are using + // a ByteBuffer + } + } + + public void messageHeaders(Struct... headers) + { + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java deleted file mode 100644 index 13acff1ea6..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/Client.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.apache.qpidity.impl; - -import java.net.URL; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpidity.Channel; -import org.apache.qpidity.Connection; -import org.apache.qpidity.ConnectionClose; -import org.apache.qpidity.ConnectionDelegate; -import org.apache.qpidity.MinaHandler; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.client.DtxSession; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; - - -public class Client implements org.apache.qpidity.client.Connection -{ - private AtomicInteger _channelNo = new AtomicInteger(); - private Connection _conn; - private ExceptionListener _exceptionListner; - private final Lock _lock = new ReentrantLock(); - - public static org.apache.qpidity.client.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - Condition negotiationComplete = _lock.newCondition(); - _lock.lock(); - - ConnectionDelegate connectionDelegate = new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - @Override public void connectionClose(Channel context, ConnectionClose struct) - { - _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - _conn = MinaHandler.connect(host, port,connectionDelegate); - - _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); - - try - { - negotiationComplete.await(); - } - catch (Exception e) - { - // - } - finally - { - _lock.unlock(); - } - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(URL url) throws QpidException - { - throw new UnsupportedOperationException(); - } - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(0, "client is closing", 0, 0); - //need to close the connection underneath as well - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); - ClientSession ssn = new ClientSession(); - ssn.attach(ch); - ssn.sessionOpen(expiryInSeconds); - - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - // TODO Auto-generated method stub - return null; - } - - public void setExceptionListener(ExceptionListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java deleted file mode 100644 index 627829556c..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ /dev/null @@ -1,101 +0,0 @@ -package org.apache.qpidity.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpidity.Option; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.MessagePartListener; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session -{ - private Map _messageListeners = new HashMap(); - private ExceptionListener _exceptionListner; - private RangeSet _acquiredMessages; - private RangeSet _rejectedMessages; - private Map> _unackedMessages = new HashMap>(); - - @Override public void sessionClose() - { - // release all unacked messages and then issues a close - super.sessionClose(); - } - - public void messageAcknowledge(RangeSet ranges) - { - for (Range range : ranges) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("Acknowleding message for : " + super.getCommand((int) l)); - super.processed(l); - } - } - } - - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); - } - - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) - { - // need to break it down into small pieces - super.messageTransfer(exchange, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - // super.data(bytes); * - // super.endData() - } - - - public RangeSet getAccquiredMessages() - { - return _acquiredMessages; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - _messageListeners.put(destination, listener); - } - - public void setExceptionListener(ExceptionListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - // ugly but nessacery - - void setAccquiredMessages(RangeSet acquiredMessages) - { - _acquiredMessages = acquiredMessages; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onException(ex); - } - - Map getMessageListerners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java deleted file mode 100644 index 70d1019a06..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,79 +0,0 @@ -package org.apache.qpidity.impl; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.Frame; -import org.apache.qpidity.MessageAcquired; -import org.apache.qpidity.MessageReject; -import org.apache.qpidity.MessageTransfer; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Session; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.Struct; -import org.apache.qpidity.client.MessagePartListener; - - -public class ClientSessionDelegate extends SessionDelegate -{ - private MessageTransfer _currentTransfer; - private MessagePartListener _currentMessageListener; - - @Override public void data(Session ssn, Frame frame) - { - for (ByteBuffer b : frame) - { - _currentMessageListener.addData(b); - } - if (frame.isLastSegment() && frame.isLastFrame()) - { - _currentMessageListener.messageReceived(); - } - - } - - @Override public void headers(Session ssn, Struct... headers) - { - _currentMessageListener.messageHeaders(headers); - } - - - @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) - { - _currentTransfer = currentTransfer; - _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); - - //a better way is to tell the broker to stop the transfer - if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) - { - RangeSet transfers = new RangeSet(); - transfers.add(_currentTransfer.getId()); - session.messageRelease(transfers); - } - } - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); - } - - @Override public void messageAcquired(Session session, MessageAcquired struct) - { - ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java deleted file mode 100644 index d325054bee..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.qpidity.impl; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; -import org.apache.qpidity.client.Connection; - -public class DemoClient -{ - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() - { - public void onException(QpidException e) - { - System.out.println(e); - } - }); - ssn.queueDeclare("Queue1", null, null); - ssn.sync(); - - ssn.messageTransfer("Queue1", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties(), - new MessageProperties()); - ssn.data("this is the data"); - ssn.endData(); - - ssn.messageTransfer("Queue2", (short) 0, (short) 1); - ssn.data("this should be rejected"); - ssn.endData(); - ssn.sync(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java deleted file mode 100644 index 5d3f6a6e3e..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ /dev/null @@ -1,109 +0,0 @@ -package org.apache.qpidity.impl; - -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.Struct; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.MessageListener; -import org.apache.qpidity.client.MessagePartListener; - -/** - * - * Will call onMessage method as soon as data is avialable - * The client can then start to process the data while - * the rest of the data is read. - * - */ -public class MessagePartListenerAdapter implements MessagePartListener -{ - MessageListener _adaptee; - Message _currentMsg; - DeliveryProperties _currentDeliveryProps; - MessageProperties _currentMessageProps; - - public MessagePartListenerAdapter(MessageListener listener) - { - _adaptee = listener; - - // temp solution. - _currentMsg = new Message() - { - Queue _data = new LinkedList(); - ByteBuffer _readBuffer; - private int dataSize; - - public void appendData(byte[] src) - { - appendData(ByteBuffer.wrap(src)); - } - - public void appendData(ByteBuffer src) - { - _data.offer(src); - dataSize += src.remaining(); - } - - public DeliveryProperties getDeliveryProperties() - { - return _currentDeliveryProps; - } - - public MessageProperties getMessageProperties() - { - return _currentMessageProps; - } - - // since we provide the message only after completion - // we can assume that when this method is called we have - // received all data. - public void readData(byte[] target) - { - if (_readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); - } - - private void buildReadBuffer() - { - _readBuffer = ByteBuffer.allocate(dataSize); - for(ByteBuffer buf:_data) - { - _readBuffer.put(buf); - } - } - }; - } - - public void addData(ByteBuffer src) - { - _currentMsg.appendData(src); - } - - public void messageHeaders(Struct... headers) - { - for(Struct struct: headers) - { - if(struct instanceof DeliveryProperties) - { - _currentDeliveryProps = (DeliveryProperties)struct; - } - else if (struct instanceof MessageProperties) - { - _currentMessageProps = (MessageProperties)struct; - } - } - } - - public void messageReceived() - { - _adaptee.onMessage(_currentMsg); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 3869c2a793..c071280b37 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -18,13 +18,13 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.impl.MessagePartListenerAdapter; import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 8d707e4ccc..41c8bc02c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -26,9 +26,9 @@ import javax.jms.Queue; import javax.jms.QueueBrowser; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.impl.MessagePartListenerAdapter; /** * Implementation of the JMS QueueBrowser interface diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index c387a38b17..b70c8fae18 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -111,6 +111,8 @@ public class Connection implements ProtocolActions } // not sure if this is the right place + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 537a7ef586..ff89567cee 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer; */ public abstract class ConnectionDelegate extends Delegate { - private String _username; - private String _password; + private String _username = "guest"; + private String _password = "guest";; private String _mechanism; private String _virtualHost; private SaslClient saslClient; @@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate //----------------------------------------------- @Override public void connectionStart(Channel context, ConnectionStart struct) { + System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); System.out.println("The broker has sent connection-start"); String mechanism = null; @@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate String knownHosts = struct.getKnownHosts(); System.out.println("The broker has opened the connection for use"); System.out.println("The broker supplied the following hosts for failover " + knownHosts); - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally + if(_negotiationCompleteLock != null) { - _negotiationCompleteLock.unlock(); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } } + System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); } public void connectionRedirect(Channel context, ConnectionRedirect struct) @@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate @Override public void connectionOpen(Channel context, ConnectionOpen struct) { String hosts = "amqp:1223243232325"; - System.out.println("The client has sent connection-open-ok"); + System.out.println("The client has sent connection-open"); context.connectionOpenOk(hosts); + System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 683008fe8a..4949568bbf 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,19 +20,16 @@ */ package org.apache.qpidity; -import java.io.IOException; +import static org.apache.qpidity.Functions.str; +import java.io.IOException; import java.nio.ByteBuffer; - import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import static org.apache.qpidity.Functions.*; - /** * ToyBroker @@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*; class ToyBroker extends SessionDelegate { - private Map> queues; + private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; private Struct[] headers = null; private List frames = null; - - public ToyBroker(Map> queues) + private Map consumers = new HashMap(); + + public ToyBroker(ToyExchange exchange) { - this.queues = queues; + this.exchange = exchange; } @Override public void queueDeclare(Session ssn, QueueDeclare qd) { - queues.put(qd.getQueue(), new LinkedList()); - System.out.println("declared queue: " + qd.getQueue()); + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void queueBind(Session ssn, QueueBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) @@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + consumers.put(ms.getDestination(),ms.getQueue()); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { @@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate props = (DeliveryProperties) hdr; } } - - if (props != null && !props.getDiscardUnroutable()) - { - String dest = xfr.getDestination(); - if (!queues.containsKey(dest)) - { - reject(ssn); - } - } - + this.headers = headers; } @@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate if (frame.isLastSegment() && frame.isLastFrame()) { String dest = xfr.getDestination(); - Queue queue = queues.get(dest); - if (queue == null) + Message m = new Message(headers, frames); + + if (exchange.route(dest,props.getRoutingKey(),m)) { - reject(ssn); + System.out.println("queued " + m); + dispatchMessages(ssn); } else { - Message m = new Message(headers, frames); - queue.offer(m); - System.out.println("queued " + m); + + reject(ssn); } ssn.processed(xfr); xfr = null; @@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } + + private void transferMessage(Session ssn,String dest, Message m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(dest, (short)0, (short)0); + ssn.headers(m.headers); + for (Frame f : m.frames) + { + for (ByteBuffer b : f) + { + ssn.data(b); + } + } + ssn.endData(); + } + + public void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + Message m = exchange.getQueue(consumers.get(dest)).poll(); + if(m != null) + { + transferMessage(ssn,dest,m); + } + } + } - private class Message + class Message { private final Struct[] headers; private final List frames; @@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final Map> queues = - new HashMap>(); - + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { - return new ToyBroker(queues); + return new ToyBroker(exchange); } }; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java new file mode 100644 index 0000000000..6fabd22462 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -0,0 +1,132 @@ +package org.apache.qpidity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.qpidity.ToyBroker.Message; + +public class ToyExchange +{ + final static String DIRECT = "amq.direct"; + final static String TOPIC = "amq.topic"; + + private Map>> directEx = new HashMap>>(); + private Map>> topicEx = new HashMap>>(); + private Map> queues = new HashMap>(); + + public void createQueue(String name) + { + queues.put(name, new LinkedList()); + } + + public Queue getQueue(String name) + { + return queues.get(name); + } + + public void bindQueue(String type,String binding,String queueName) + { + Queue queue = queues.get(queueName); + binding = normalizeKey(binding); + if(DIRECT.equals(type)) + { + + if (directEx.containsKey(binding)) + { + List> list = directEx.get(binding); + list.add(queue); + } + else + { + List> list = new LinkedList>(); + list.add(queue); + directEx.put(binding,list); + } + } + else + { + if (topicEx.containsKey(binding)) + { + List> list = topicEx.get(binding); + list.add(queue); + } + else + { + List> list = new LinkedList>(); + list.add(queue); + topicEx.put(binding,list); + } + } + } + + public boolean route(String dest,String routingKey,Message msg) + { + List> queues; + if(DIRECT.equals(dest)) + { + queues = directEx.get(routingKey); + } + else + { + queues = matchWildCard(routingKey); + } + if(queues != null && queues.size()>0) + { + System.out.println("Message stored in " + queues.size() + " queues"); + storeMessage(msg,queues); + return true; + } + else + { + System.out.println("Message unroutable " + msg); + return false; + } + } + + private String normalizeKey(String routingKey) + { + if(routingKey.indexOf(".*")>1) + { + return routingKey.substring(0,routingKey.indexOf(".*")); + } + else + { + return routingKey; + } + } + + private List> matchWildCard(String routingKey) + { + List> selected = new ArrayList>(); + + for(String key: topicEx.keySet()) + { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(routingKey); + if (m.find()) + { + for(Queue queue : topicEx.get(key)) + { + selected.add(queue); + } + } + } + + return selected; + } + + private void storeMessage(Message msg,List> selected) + { + for(Queue queue : selected) + { + queue.offer(msg); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index ccad3577f0..4e4a070fb4 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -1,5 +1,6 @@ package org.apache.qpidity.api; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpidity.MessageProperties; @@ -43,16 +44,16 @@ public interface Message * * @param src */ - public void appendData(byte[] src); + public void appendData(byte[] src) throws IOException; - public void appendData(ByteBuffer src); + public void appendData(ByteBuffer src) throws IOException; /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) * - * The read function might copy data from a + * The read function might copy data from *

    *
  • From memory (Ex: ByteBuffer) *
  • From Disk @@ -60,7 +61,8 @@ public interface Message *
* @param target */ - public void readData(byte[] target); + public void readData(byte[] target) throws IOException; + public ByteBuffer readData() throws IOException; } -- cgit v1.2.1