From c1e0112bc308edb406ac8aba3c8e47fbf710752c Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 9 Aug 2007 07:24:02 +0000 Subject: implemented the Connection and Session API git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564124 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Connection.java | 13 +- .../apache/qpidity/client/MessagePartListener.java | 10 +- .../java/org/apache/qpidity/client/Session.java | 60 ++++-- .../main/java/org/apache/qpidity/impl/Client.java | 112 ++++++++++ .../org/apache/qpidity/impl/ClientSession.java | 229 +++++---------------- .../apache/qpidity/impl/ClientSessionDelegate.java | 98 ++++++--- .../java/org/apache/qpidity/impl/DemoClient.java | 45 ++++ .../qpidity/impl/MessagePartListenerAdapter.java | 98 ++++++++- .../org/apache/qpidity/jms/ExceptionHelper.java | 4 +- .../apache/qpidity/jms/MessageConsumerImpl.java | 10 +- .../org/apache/qpidity/jms/QueueBrowserImpl.java | 4 +- .../java/org/apache/qpidity/jms/SessionImpl.java | 2 +- 12 files changed, 434 insertions(+), 251 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/impl/Client.java create mode 100644 java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 3da3a12aa5..2ea6db8943 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -28,6 +28,17 @@ import org.apache.qpidity.QpidException; */ public interface Connection { + /** + * Establish the connection using the given parameters + * + * @param host + * @param port + * @param username + * @param password + * @throws QpidException + */ + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; + /** * Establish the connection with the broker identified by the provided URL. * @@ -52,7 +63,7 @@ public interface Connection * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) session. */ - public Session createSession(int expiryInSeconds); + public Session createSession(long expiryInSeconds); /** * Create a DtxSession for this connection. diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 633c77ecac..def7b5ca41 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -17,7 +17,9 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; + +import org.apache.qpidity.Struct; /** * Assembles message parts. @@ -31,21 +33,21 @@ import org.apache.qpidity.Header; * are transferred. */ public interface MessagePartListener -{ +{ /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. * * @param headers Either DeliveryProperties or ApplicationProperties */ - public void messageHeaders(Header... headers); + public void messageHeaders(Struct... headers); /** * Add the following byte array to the content of the message being received * * @param data Data to be added or streamed. */ - public void addData(byte[] data); + public void addData(ByteBuffer src); /** * Indicates that the message has been fully received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 0178ef6d3b..84de268232 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -18,12 +18,14 @@ */ package org.apache.qpidity.client; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.UUID; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.Header; import org.apache.qpidity.Option; import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Struct; +import org.apache.qpidity.api.Message; /** *

A session is associated with a connection. @@ -32,10 +34,12 @@ import org.apache.qpidity.RangeSet; */ public interface Session { - public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; - public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; - public static final short CONFIRM_MODE_REQUIRED = 1; - public static final short CONFIRM_MODE_NOT_REQUIRED = 0; + public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; + public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0; + public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0; + public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1; + public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1; + public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0; public static final short MESSAGE_FLOW_MODE_CREDIT = 0; public static final short MESSAGE_FLOW_MODE_WINDOW = 1; public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; @@ -54,21 +58,21 @@ public interface Session /** * Close this session and any associated resources. */ - public void close(); + public void sessionClose(); /** * Suspend this session resulting in interrupting the traffic with the broker. *

The session timer will start to tick in suspend. *

When a session is suspend any operation of this session and of the associated resources are unavailable. */ - public void suspend(); + public void sessionSuspend(); /** * This will resume an existing session *

Upon resume the session is attached with an underlying channel * hence making operation on this session available. */ - public void resume(); + public void sessionResume(UUID sessionId); //------------------------------------------------------ // Messaging methods @@ -92,7 +96,7 @@ public interface Session * @param exchange The exchange the message is being sent. * @param msg The Message to be sent */ - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode); + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode); /** * Declare the beginning of a message transfer operation. This operation must @@ -117,26 +121,38 @@ public interface Session * * @param exchange The exchange the message is being sent. */ - public void messageTransfer(String exchange, short confirmMode, short acquireMode); + public void messageTransfer(String destination, short confirmMode, short acquireMode); /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or to the message being sent. * - * @param headers Either DeliveryProperties or ApplicationProperties + * @param headers are Either DeliveryProperties or ApplicationProperties * @see org.apache.qpidity.DeliveryProperties */ - public void addMessageHeaders(Header... headers); + public void headers(Struct... headers); /** * Add the following byte array to the content of the message being sent. * * @param data Data to be added. - * @param off Offset from which to start reading data - * @param len Number of bytes to be read */ - public void addData(byte[] data, int off, int len); + public void data(byte[] data); + + /** + * Add the following ByteBuffer to the content of the message being sent. + * + * @param data Data to be added. + */ + public void data(ByteBuffer buf); + /** + * Add the following String to the content of the message being sent. + * + * @param data Data to be added. + */ + public void data(String str); + /** * Signals the end of data for the message. */ @@ -258,8 +274,6 @@ public interface Session * @param destination The destination to call flush on. */ public void messageFlush(String destination); - - public int getNoOfUnAckedMessages(); /** * On receipt of this method, the brokers MUST set his credit to zero for the given @@ -286,8 +300,12 @@ public interface Session * and may be either discarded or moved to the broker dead letter queue. * * @param ranges Range of rejected messages. + * @param code TODO + * @param text TODO */ - public void messageReject(RangeSet ranges); + public void messageReject(RangeSet ranges, int code, String text); + + public RangeSet getRejectedMessages(); /** * Try to acquire ranges of messages hence releasing them form the queue. @@ -296,10 +314,10 @@ public interface Session * message acquisition can fail. * The outcome of the acquisition is returned as an array of ranges of qcquired messages. *

This method should only be called on non-acquired messages. - * + * @param mode TODO * @param range Ranges of messages to be acquired. */ - public void messageAcquire(RangeSet ranges); + public void messageAcquire(RangeSet ranges, short mode); public RangeSet getAccquiredMessages(); diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java new file mode 100644 index 0000000000..13acff1ea6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/Client.java @@ -0,0 +1,112 @@ +package org.apache.qpidity.impl; + +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.DtxSession; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index a0111a86f4..627829556c 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -1,216 +1,101 @@ package org.apache.qpidity.impl; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; -import org.apache.qpidity.*; /** * Implements a Qpid Sesion. */ -public class ClientSession implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session { - - Map messagListeners = new HashMap(); - - public void addData(byte[] data, int off, int len) - { - // TODO Auto-generated method stub - - } - - public void addMessageHeaders(Header... headers) - { - // TODO Auto-generated method stub - - } - - public void close() - { - // TODO Auto-generated method stub - - } - - public void endData() - { - // TODO Auto-generated method stub - - } - - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void exchangeDelete(String exchangeName, Option... options) + private Map _messageListeners = new HashMap(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map> _unackedMessages = new HashMap>(); + + @Override public void sessionClose() { - // TODO Auto-generated method stub - + // release all unacked messages and then issues a close + super.sessionClose(); } - + public void messageAcknowledge(RangeSet ranges) { - // TODO Auto-generated method stub - - } - - public void messageAcquire(RangeSet ranges) - { - // TODO Auto-generated method stub - } - - public void messageCancel(String destination) - { - // TODO Auto-generated method stub - - } - - public void messageFlow(String destination, short unit, long value) - { - // TODO Auto-generated method stub - - } - - public void messageFlowMode(String destination, short mode) - { - // TODO Auto-generated method stub - - } - - public void messageFlush(String destination) - { - // TODO Auto-generated method stub - } - - public void messageReject(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageRelease(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageStop(String destination) - { - // TODO Auto-generated method stub - + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } } public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) { - // TODO Auto-generated method stub - + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) { - // TODO Auto-generated method stub - + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() } - - public void messageTransfer(String exchange, short confirmMode, short acquireMode) - { - // TODO Auto-generated method stub - - } - - public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) - { - // TODO Auto-generated method stub - - } - - public void queueDeclare(String queueName, String alternateExchange, Map arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queueDelete(String queueName, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queuePurge(String queueName) - { - // TODO Auto-generated method stub - - } - - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) - { - // TODO Auto-generated method stub - - } - - public void resume() + + + public RangeSet getAccquiredMessages() { - // TODO Auto-generated method stub - + return _acquiredMessages; } - public void setExceptionListener(ExceptionListener exceptionListner) + public RangeSet getRejectedMessages() { - // TODO Auto-generated method stub - + return _rejectedMessages; } - + public void setMessageListener(String destination, MessagePartListener listener) { - // TODO Auto-generated method stub - + _messageListeners.put(destination, listener); } - - public void suspend() - { - // TODO Auto-generated method stub - - } - - public void sync() - { - // TODO Auto-generated method stub - - } - - public void txCommit() throws IllegalStateException + + public void setExceptionListener(ExceptionListener exceptionListner) { - // TODO Auto-generated method stub - - } - - public void txRollback() throws IllegalStateException + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) { - // TODO Auto-generated method stub - + _acquiredMessages = acquiredMessages; } - - public void txSelect() + + void setRejectedMessages(RangeSet rejectedMessages) { - // TODO Auto-generated method stub - + _rejectedMessages = rejectedMessages; } - - public RangeSet getAccquiredMessages() + + void notifyException(QpidException ex) { - // TODO Auto-generated method stub - return null; + _exceptionListner.onException(ex); } - - public int getNoOfUnAckedMessages() + + Map getMessageListerners() { - // TODO Auto-generated method stub - return 0; + return _messageListeners; } - - } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java index ea9e4c067b..70d1019a06 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -1,45 +1,79 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.client.Session; +import java.nio.ByteBuffer; +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; -public class ClientSessionDelegate extends CommonSessionDelegate -{ - - /*@Override public void messageTransfer(Session context, MessageTransfer struct) - { - MessagePartListener l = context.messagListeners.get(struct.getDestination()); - l.messageTransfer(struct.getDestination(),new Option[0]); - }*/ - - // --------------------------------------------------------------- - // Non generated methods - but would like if they are also generated. - // These methods should be called from Body and Header Handlers. - // If these methods are generated as part of the delegate then - // I can call these methods from the BodyHandler and HeaderHandler - // in a generic way - // I have used destination to indicate my intent of receiving - // some form of correlation to know which consumer this data belongs to. - // It can be anything as long as I can make the right correlation - // ---------------------------------------------------------------- - /* public void data(Session context,String destination,byte[] src) throws QpidException +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) { - MessagePartListener l = context.messagListeners.get(destination); - l.data(src); + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + } - public void endData(Session context,String destination) throws QpidException + @Override public void headers(Session ssn, Struct... headers) { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); + _currentMessageListener.messageHeaders(headers); } - public void messageHeaders(Session context,String destination,Header... headers) throws QpidException - { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); - }*/ + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java new file mode 100644 index 0000000000..d325054bee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java @@ -0,0 +1,45 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.Connection; + +public class DemoClient +{ + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("Queue1", null, null); + ssn.sync(); + + ssn.messageTransfer("Queue1", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties(), + new MessageProperties()); + ssn.data("this is the data"); + ssn.endData(); + + ssn.messageTransfer("Queue2", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java index f05f2c0e76..5d3f6a6e3e 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java @@ -1,33 +1,109 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; +/** + * + * Will call onMessage method as soon as data is avialable + * The client can then start to process the data while + * the rest of the data is read. + * + */ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - _currentMsg = null; - } + + // temp solution. + _currentMsg = new Message() + { + Queue _data = new LinkedList(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) + { + appendData(ByteBuffer.wrap(src)); + } - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } + public void appendData(ByteBuffer src) + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } - public void messageHeaders(Header... headers) + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) + { + if (_readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + private void buildReadBuffer() + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + }; + } + + public void addData(ByteBuffer src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Struct... headers) { - //_currentMsg add the headers + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } } - + public void messageReceived() { - _adaptee.onMessage(_currentMsg); + _adaptee.onMessage(_currentMsg); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java index 1dc35b5609..98767106ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -34,7 +34,7 @@ public class ExceptionHelper { if (exception instanceof QpidException) { - jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); } else { @@ -51,7 +51,7 @@ public class ExceptionHelper static public XAException convertQpidExceptionToXAException(QpidException exception) { - String qpidErrorCode = exception.getErrorCode(); + String qpidErrorCode = String.valueOf(exception.getErrorCode()); // todo map this error to an XA code int xaCode = XAException.XAER_PROTO; return new XAException(xaCode); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index b9922de296..3869c2a793 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages - _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, + _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); if (_messageSelector != null) { @@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. @@ -591,7 +591,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - getSession().getQpidSession().messageAcquire(ranges); + getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index ad85f33aea..8d707e4ccc 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(queue.getQueueName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We do not acquire those messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index d5342fb464..5f2e7cbda7 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -384,7 +384,7 @@ public class SessionImpl implements Session _incomingAsynchronousMessages.notifyAll(); } // close the underlaying QpidSession - _qpidSession.close(); + _qpidSession.sessionClose(); } } -- cgit v1.2.1