From 55fa9bf941d78a34f90f2ed278afe4d5247abfad Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 10 Aug 2007 01:49:45 +0000 Subject: Added a Toy Exchange that does same basic routing for direct and topic. Should be good enough for Arnaud to test atleast the basic JMS functionality. Added a FileMessage to demo Martins requirment. Haven't tested yet. The Toy Broker can now accept subscriptions and transfer messages to clients git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Client.java | 108 +++++++++++++++ .../org/apache/qpidity/client/ClientSession.java | 99 ++++++++++++++ .../qpidity/client/ClientSessionDelegate.java | 78 +++++++++++ .../java/org/apache/qpidity/client/Connection.java | 4 +- .../java/org/apache/qpidity/client/DemoClient.java | 84 ++++++++++++ .../java/org/apache/qpidity/client/Session.java | 8 -- .../apache/qpidity/client/util/FileMessage.java | 99 ++++++++++++++ .../client/util/MessagePartListenerAdapter.java | 149 +++++++++++++++++++++ .../main/java/org/apache/qpidity/impl/Client.java | 112 ---------------- .../org/apache/qpidity/impl/ClientSession.java | 101 -------------- .../apache/qpidity/impl/ClientSessionDelegate.java | 79 ----------- .../java/org/apache/qpidity/impl/DemoClient.java | 45 ------- .../qpidity/impl/MessagePartListenerAdapter.java | 109 --------------- .../apache/qpidity/jms/MessageConsumerImpl.java | 2 +- .../org/apache/qpidity/jms/QueueBrowserImpl.java | 2 +- 15 files changed, 622 insertions(+), 457 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/client/Client.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSession.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/DemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/Client.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java new file mode 100644 index 0000000000..8465475282 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -0,0 +1,108 @@ +package org.apache.qpidity.client; + +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java new file mode 100644 index 0000000000..fcde60fa04 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java @@ -0,0 +1,99 @@ +package org.apache.qpidity.client; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.api.Message; + +/** + * Implements a Qpid Sesion. + */ +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session +{ + private Map _messageListeners = new HashMap(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map> _unackedMessages = new HashMap>(); + + @Override public void sessionClose() + { + // release all unacked messages and then issues a close + super.sessionClose(); + } + + public void messageAcknowledge(RangeSet ranges) + { + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } + } + + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) + { + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); + } + + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) + { + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() + } + + + public RangeSet getAccquiredMessages() + { + return _acquiredMessages; + } + + public RangeSet getRejectedMessages() + { + return _rejectedMessages; + } + + public void setMessageListener(String destination, MessagePartListener listener) + { + _messageListeners.put(destination, listener); + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) + { + _acquiredMessages = acquiredMessages; + } + + void setRejectedMessages(RangeSet rejectedMessages) + { + _rejectedMessages = rejectedMessages; + } + + void notifyException(QpidException ex) + { + _exceptionListner.onException(ex); + } + + Map getMessageListerners() + { + return _messageListeners; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java new file mode 100644 index 0000000000..975dcb6d8b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java @@ -0,0 +1,78 @@ +package org.apache.qpidity.client; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; + + +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) + { + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + + } + + @Override public void headers(Session ssn, Struct... headers) + { + _currentMessageListener.messageHeaders(headers); + } + + + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 2ea6db8943..9bc17b14a6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -20,6 +20,7 @@ package org.apache.qpidity.client; import java.net.URL; +import java.util.UUID; import org.apache.qpidity.QpidException; @@ -46,7 +47,7 @@ public interface Connection * @throws QpidException If the communication layer fails to connect with the broker. */ public void connect(URL url) throws QpidException; - + /** * Close this connection. * @@ -83,5 +84,6 @@ public interface Connection * * @param exceptionListner The execptionListener */ + public void setExceptionListener(ExceptionListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java new file mode 100644 index 0000000000..e46065e0a0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java @@ -0,0 +1,84 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class DemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + // queue + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.data("this is the data"); + ssn.endData(); + + //reject + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.endData(); + ssn.sync(); + + // topic subs + ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.sync(); + + ssn.queueDeclare("topic1", null, null); + ssn.queueBind("topic1", "amq.topic", "stock.*",null); + ssn.queueDeclare("topic2", null, null); + ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); + ssn.queueDeclare("topic3", null, null); + ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); + ssn.sync(); + + // topic + ssn.messageTransfer("amq.topic", (short) 0, (short) 1); + ssn.data("Topic message"); + ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 84de268232..09595c8d0b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -20,7 +20,6 @@ package org.apache.qpidity.client; import java.nio.ByteBuffer; import java.util.Map; -import java.util.UUID; import org.apache.qpidity.Option; import org.apache.qpidity.RangeSet; @@ -67,13 +66,6 @@ public interface Session */ public void sessionSuspend(); - /** - * This will resume an existing session - *

Upon resume the session is attached with an underlying channel - * hence making operation on this session available. - */ - public void sessionResume(UUID sessionId); - //------------------------------------------------------ // Messaging methods // Producer diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java new file mode 100644 index 0000000000..84f18dcca4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -0,0 +1,99 @@ +package org.apache.qpidity.client.util; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +/** + * FileMessage provides pull style semantics for + * larges messages backed by a disk. + * Instead of loading all data into memeory it uses + * FileChannel to map regions of the file into memeory + * at a time. + * + * The write methods are not supported. + * + * From the standpoint of performance it is generally + * only worth mapping relatively large files into memory. + * + * FileMessage msg = new FileMessage(in,delProps,msgProps); + * session.messageTransfer(dest,msg,0,0); + * + * The messageTransfer method will read the file in chunks + * and stream it. + * + */ +public class FileMessage implements Message +{ + private MessageProperties _messageProperties; + private DeliveryProperties _deliveryProperties; + private FileChannel _fileChannel; + private int _chunkSize; + private long _fileSize; + private long _pos = 0; + + public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _fileChannel = in.getChannel(); + _chunkSize = chunkSize; + _fileSize = _fileChannel.size(); + + if (_fileSize <= _chunkSize) + { + _chunkSize = (int)_fileSize; + } + } + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + + public void readData(byte[] target) throws IOException + { + int readLen = target.length <= _chunkSize ? target.length : _chunkSize; + if (_pos + readLen > _fileSize) + { + readLen = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen); + _pos += readLen; + bb.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_pos + _chunkSize > _fileSize) + { + _chunkSize = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + _pos += _chunkSize; + return bb; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..4ff83db939 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -0,0 +1,149 @@ +package org.apache.qpidity.client.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.MessagePartListener; + +/** + * This is a simple message assembler. + * Will call onMessage method of the adaptee + * when all message data is read. + * + * This is a good convinience utility for handling + * small messages + */ +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + + // temp solution. + _currentMsg = new Message() + { + Queue _data = new LinkedList(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) throws IOException + { + appendData(ByteBuffer.wrap(src)); + } + + public void appendData(ByteBuffer src) throws IOException + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } + + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + return _readBuffer; + } + + private void buildReadBuffer() + { + //optimize for the simple cases + if(_data.size() == 1) + { + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + } + + //hack for testing + @Override public String toString() + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + byte[] b = new byte[_readBuffer.limit()]; + _readBuffer.get(b); + return new String(b); + } + }; + } + + public void addData(ByteBuffer src) + { + try + { + _currentMsg.appendData(src); + } + catch(IOException e) + { + // A chance for IO exception + // doesn't occur as we are using + // a ByteBuffer + } + } + + public void messageHeaders(Struct... headers) + { + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java deleted file mode 100644 index 13acff1ea6..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/Client.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.apache.qpidity.impl; - -import java.net.URL; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpidity.Channel; -import org.apache.qpidity.Connection; -import org.apache.qpidity.ConnectionClose; -import org.apache.qpidity.ConnectionDelegate; -import org.apache.qpidity.MinaHandler; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.client.DtxSession; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; - - -public class Client implements org.apache.qpidity.client.Connection -{ - private AtomicInteger _channelNo = new AtomicInteger(); - private Connection _conn; - private ExceptionListener _exceptionListner; - private final Lock _lock = new ReentrantLock(); - - public static org.apache.qpidity.client.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - Condition negotiationComplete = _lock.newCondition(); - _lock.lock(); - - ConnectionDelegate connectionDelegate = new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - @Override public void connectionClose(Channel context, ConnectionClose struct) - { - _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - _conn = MinaHandler.connect(host, port,connectionDelegate); - - _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); - - try - { - negotiationComplete.await(); - } - catch (Exception e) - { - // - } - finally - { - _lock.unlock(); - } - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(URL url) throws QpidException - { - throw new UnsupportedOperationException(); - } - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(0, "client is closing", 0, 0); - //need to close the connection underneath as well - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); - ClientSession ssn = new ClientSession(); - ssn.attach(ch); - ssn.sessionOpen(expiryInSeconds); - - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - // TODO Auto-generated method stub - return null; - } - - public void setExceptionListener(ExceptionListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java deleted file mode 100644 index 627829556c..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ /dev/null @@ -1,101 +0,0 @@ -package org.apache.qpidity.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpidity.Option; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.MessagePartListener; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session -{ - private Map _messageListeners = new HashMap(); - private ExceptionListener _exceptionListner; - private RangeSet _acquiredMessages; - private RangeSet _rejectedMessages; - private Map> _unackedMessages = new HashMap>(); - - @Override public void sessionClose() - { - // release all unacked messages and then issues a close - super.sessionClose(); - } - - public void messageAcknowledge(RangeSet ranges) - { - for (Range range : ranges) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("Acknowleding message for : " + super.getCommand((int) l)); - super.processed(l); - } - } - } - - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); - } - - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) - { - // need to break it down into small pieces - super.messageTransfer(exchange, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - // super.data(bytes); * - // super.endData() - } - - - public RangeSet getAccquiredMessages() - { - return _acquiredMessages; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - _messageListeners.put(destination, listener); - } - - public void setExceptionListener(ExceptionListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - // ugly but nessacery - - void setAccquiredMessages(RangeSet acquiredMessages) - { - _acquiredMessages = acquiredMessages; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onException(ex); - } - - Map getMessageListerners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java deleted file mode 100644 index 70d1019a06..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,79 +0,0 @@ -package org.apache.qpidity.impl; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.Frame; -import org.apache.qpidity.MessageAcquired; -import org.apache.qpidity.MessageReject; -import org.apache.qpidity.MessageTransfer; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Session; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.Struct; -import org.apache.qpidity.client.MessagePartListener; - - -public class ClientSessionDelegate extends SessionDelegate -{ - private MessageTransfer _currentTransfer; - private MessagePartListener _currentMessageListener; - - @Override public void data(Session ssn, Frame frame) - { - for (ByteBuffer b : frame) - { - _currentMessageListener.addData(b); - } - if (frame.isLastSegment() && frame.isLastFrame()) - { - _currentMessageListener.messageReceived(); - } - - } - - @Override public void headers(Session ssn, Struct... headers) - { - _currentMessageListener.messageHeaders(headers); - } - - - @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) - { - _currentTransfer = currentTransfer; - _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); - - //a better way is to tell the broker to stop the transfer - if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) - { - RangeSet transfers = new RangeSet(); - transfers.add(_currentTransfer.getId()); - session.messageRelease(transfers); - } - } - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); - } - - @Override public void messageAcquired(Session session, MessageAcquired struct) - { - ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java deleted file mode 100644 index d325054bee..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.qpidity.impl; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; -import org.apache.qpidity.client.Connection; - -public class DemoClient -{ - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() - { - public void onException(QpidException e) - { - System.out.println(e); - } - }); - ssn.queueDeclare("Queue1", null, null); - ssn.sync(); - - ssn.messageTransfer("Queue1", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties(), - new MessageProperties()); - ssn.data("this is the data"); - ssn.endData(); - - ssn.messageTransfer("Queue2", (short) 0, (short) 1); - ssn.data("this should be rejected"); - ssn.endData(); - ssn.sync(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java deleted file mode 100644 index 5d3f6a6e3e..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ /dev/null @@ -1,109 +0,0 @@ -package org.apache.qpidity.impl; - -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.Struct; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.MessageListener; -import org.apache.qpidity.client.MessagePartListener; - -/** - * - * Will call onMessage method as soon as data is avialable - * The client can then start to process the data while - * the rest of the data is read. - * - */ -public class MessagePartListenerAdapter implements MessagePartListener -{ - MessageListener _adaptee; - Message _currentMsg; - DeliveryProperties _currentDeliveryProps; - MessageProperties _currentMessageProps; - - public MessagePartListenerAdapter(MessageListener listener) - { - _adaptee = listener; - - // temp solution. - _currentMsg = new Message() - { - Queue _data = new LinkedList(); - ByteBuffer _readBuffer; - private int dataSize; - - public void appendData(byte[] src) - { - appendData(ByteBuffer.wrap(src)); - } - - public void appendData(ByteBuffer src) - { - _data.offer(src); - dataSize += src.remaining(); - } - - public DeliveryProperties getDeliveryProperties() - { - return _currentDeliveryProps; - } - - public MessageProperties getMessageProperties() - { - return _currentMessageProps; - } - - // since we provide the message only after completion - // we can assume that when this method is called we have - // received all data. - public void readData(byte[] target) - { - if (_readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); - } - - private void buildReadBuffer() - { - _readBuffer = ByteBuffer.allocate(dataSize); - for(ByteBuffer buf:_data) - { - _readBuffer.put(buf); - } - } - }; - } - - public void addData(ByteBuffer src) - { - _currentMsg.appendData(src); - } - - public void messageHeaders(Struct... headers) - { - for(Struct struct: headers) - { - if(struct instanceof DeliveryProperties) - { - _currentDeliveryProps = (DeliveryProperties)struct; - } - else if (struct instanceof MessageProperties) - { - _currentMessageProps = (MessageProperties)struct; - } - } - } - - public void messageReceived() - { - _adaptee.onMessage(_currentMsg); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 3869c2a793..c071280b37 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -18,13 +18,13 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.impl.MessagePartListenerAdapter; import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 8d707e4ccc..41c8bc02c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -26,9 +26,9 @@ import javax.jms.Queue; import javax.jms.QueueBrowser; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.impl.MessagePartListenerAdapter; /** * Implementation of the JMS QueueBrowser interface -- cgit v1.2.1