diff options
Diffstat (limited to 'java/client')
12 files changed, 434 insertions, 251 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 3da3a12aa5..2ea6db8943 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -28,6 +28,17 @@ import org.apache.qpidity.QpidException; */ public interface Connection { + /** + * Establish the connection using the given parameters + * + * @param host + * @param port + * @param username + * @param password + * @throws QpidException + */ + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; + /** * Establish the connection with the broker identified by the provided URL. * @@ -52,7 +63,7 @@ public interface Connection * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) session. */ - public Session createSession(int expiryInSeconds); + public Session createSession(long expiryInSeconds); /** * Create a DtxSession for this connection. diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 633c77ecac..def7b5ca41 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -17,7 +17,9 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; + +import org.apache.qpidity.Struct; /** * Assembles message parts. @@ -31,21 +33,21 @@ import org.apache.qpidity.Header; * are transferred. */ public interface MessagePartListener -{ +{ /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> */ - public void messageHeaders(Header... headers); + public void messageHeaders(Struct... headers); /** * Add the following byte array to the content of the message being received * * @param data Data to be added or streamed. */ - public void addData(byte[] data); + public void addData(ByteBuffer src); /** * Indicates that the message has been fully received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 0178ef6d3b..84de268232 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -18,12 +18,14 @@ */
package org.apache.qpidity.client;
+import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.UUID;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
/**
* <p>A session is associated with a connection.
@@ -32,10 +34,12 @@ import org.apache.qpidity.RangeSet; */
public interface Session
{
- public static final short ACQUIRE_MODE_NO_ACQUIRE = 0;
- public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1;
- public static final short CONFIRM_MODE_REQUIRED = 1;
- public static final short CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+ public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+ public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
@@ -54,21 +58,21 @@ public interface Session /**
* Close this session and any associated resources.
*/
- public void close();
+ public void sessionClose();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
*/
- public void suspend();
+ public void sessionSuspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
*/
- public void resume();
+ public void sessionResume(UUID sessionId);
//------------------------------------------------------
// Messaging methods
@@ -92,7 +96,7 @@ public interface Session * @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -117,27 +121,39 @@ public interface Session * </ul>
* @param exchange The exchange the message is being sent.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
- * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ * @param headers are Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers);
+ public void headers(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
*
* @param data Data to be added.
- * @param off Offset from which to start reading data
- * @param len Number of bytes to be read
*/
- public void addData(byte[] data, int off, int len);
+ public void data(byte[] data);
+
+ /**
+ * Add the following ByteBuffer to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(ByteBuffer buf);
/**
+ * Add the following String to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(String str);
+
+ /**
* Signals the end of data for the message.
*/
public void endData();
@@ -258,8 +274,6 @@ public interface Session * @param destination The destination to call flush on.
*/
public void messageFlush(String destination);
-
- public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -286,8 +300,12 @@ public interface Session * and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
+ * @param code TODO
+ * @param text TODO
*/
- public void messageReject(RangeSet ranges);
+ public void messageReject(RangeSet ranges, int code, String text);
+
+ public RangeSet getRejectedMessages();
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -296,10 +314,10 @@ public interface Session * message acquisition can fail.
* The outcome of the acquisition is returned as an array of ranges of qcquired messages.
* <p> This method should only be called on non-acquired messages.
- *
+ * @param mode TODO
* @param range Ranges of messages to be acquired.
*/
- public void messageAcquire(RangeSet ranges);
+ public void messageAcquire(RangeSet ranges, short mode);
public RangeSet getAccquiredMessages();
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java new file mode 100644 index 0000000000..13acff1ea6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/Client.java @@ -0,0 +1,112 @@ +package org.apache.qpidity.impl; + +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.DtxSession; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index a0111a86f4..627829556c 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -1,216 +1,101 @@ package org.apache.qpidity.impl; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; -import org.apache.qpidity.*; /** * Implements a Qpid Sesion. */ -public class ClientSession implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session { - - Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); - - public void addData(byte[] data, int off, int len) - { - // TODO Auto-generated method stub - - } - - public void addMessageHeaders(Header... headers) - { - // TODO Auto-generated method stub - - } - - public void close() - { - // TODO Auto-generated method stub - - } - - public void endData() - { - // TODO Auto-generated method stub - - } - - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void exchangeDelete(String exchangeName, Option... options) + private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map<String,List<RangeSet>> _unackedMessages = new HashMap<String,List<RangeSet>>(); + + @Override public void sessionClose() { - // TODO Auto-generated method stub - + // release all unacked messages and then issues a close + super.sessionClose(); } - + public void messageAcknowledge(RangeSet ranges) { - // TODO Auto-generated method stub - - } - - public void messageAcquire(RangeSet ranges) - { - // TODO Auto-generated method stub - } - - public void messageCancel(String destination) - { - // TODO Auto-generated method stub - - } - - public void messageFlow(String destination, short unit, long value) - { - // TODO Auto-generated method stub - - } - - public void messageFlowMode(String destination, short mode) - { - // TODO Auto-generated method stub - - } - - public void messageFlush(String destination) - { - // TODO Auto-generated method stub - } - - public void messageReject(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageRelease(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageStop(String destination) - { - // TODO Auto-generated method stub - + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } } public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options) { - // TODO Auto-generated method stub - + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) { - // TODO Auto-generated method stub - + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() } - - public void messageTransfer(String exchange, short confirmMode, short acquireMode) - { - // TODO Auto-generated method stub - - } - - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - { - // TODO Auto-generated method stub - - } - - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queueDelete(String queueName, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queuePurge(String queueName) - { - // TODO Auto-generated method stub - - } - - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - { - // TODO Auto-generated method stub - - } - - public void resume() + + + public RangeSet getAccquiredMessages() { - // TODO Auto-generated method stub - + return _acquiredMessages; } - public void setExceptionListener(ExceptionListener exceptionListner) + public RangeSet getRejectedMessages() { - // TODO Auto-generated method stub - + return _rejectedMessages; } - + public void setMessageListener(String destination, MessagePartListener listener) { - // TODO Auto-generated method stub - + _messageListeners.put(destination, listener); } - - public void suspend() - { - // TODO Auto-generated method stub - - } - - public void sync() - { - // TODO Auto-generated method stub - - } - - public void txCommit() throws IllegalStateException + + public void setExceptionListener(ExceptionListener exceptionListner) { - // TODO Auto-generated method stub - - } - - public void txRollback() throws IllegalStateException + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) { - // TODO Auto-generated method stub - + _acquiredMessages = acquiredMessages; } - - public void txSelect() + + void setRejectedMessages(RangeSet rejectedMessages) { - // TODO Auto-generated method stub - + _rejectedMessages = rejectedMessages; } - - public RangeSet getAccquiredMessages() + + void notifyException(QpidException ex) { - // TODO Auto-generated method stub - return null; + _exceptionListner.onException(ex); } - - public int getNoOfUnAckedMessages() + + Map<String,MessagePartListener> getMessageListerners() { - // TODO Auto-generated method stub - return 0; + return _messageListeners; } - - } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java index ea9e4c067b..70d1019a06 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -1,45 +1,79 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.client.Session; +import java.nio.ByteBuffer; +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; -public class ClientSessionDelegate extends CommonSessionDelegate -{ - - /*@Override public void messageTransfer(Session context, MessageTransfer struct) - { - MessagePartListener l = context.messagListeners.get(struct.getDestination()); - l.messageTransfer(struct.getDestination(),new Option[0]); - }*/ - - // --------------------------------------------------------------- - // Non generated methods - but would like if they are also generated. - // These methods should be called from Body and Header Handlers. - // If these methods are generated as part of the delegate then - // I can call these methods from the BodyHandler and HeaderHandler - // in a generic way - // I have used destination to indicate my intent of receiving - // some form of correlation to know which consumer this data belongs to. - // It can be anything as long as I can make the right correlation - // ---------------------------------------------------------------- - /* public void data(Session context,String destination,byte[] src) throws QpidException +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) { - MessagePartListener l = context.messagListeners.get(destination); - l.data(src); + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + } - public void endData(Session context,String destination) throws QpidException + @Override public void headers(Session ssn, Struct... headers) { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); + _currentMessageListener.messageHeaders(headers); } - public void messageHeaders(Session context,String destination,Header... headers) throws QpidException - { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); - }*/ + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java new file mode 100644 index 0000000000..d325054bee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java @@ -0,0 +1,45 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.Connection; + +public class DemoClient +{ + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("Queue1", null, null); + ssn.sync(); + + ssn.messageTransfer("Queue1", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties(), + new MessageProperties()); + ssn.data("this is the data"); + ssn.endData(); + + ssn.messageTransfer("Queue2", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java index f05f2c0e76..5d3f6a6e3e 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java @@ -1,33 +1,109 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; +/** + * + * Will call onMessage method as soon as data is avialable + * The client can then start to process the data while + * the rest of the data is read. + * + */ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - _currentMsg = null; - } + + // temp solution. + _currentMsg = new Message() + { + Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) + { + appendData(ByteBuffer.wrap(src)); + } - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } + public void appendData(ByteBuffer src) + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } - public void messageHeaders(Header... headers) + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) + { + if (_readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + private void buildReadBuffer() + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + }; + } + + public void addData(ByteBuffer src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Struct... headers) { - //_currentMsg add the headers + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } } - + public void messageReceived() { - _adaptee.onMessage(_currentMsg); + _adaptee.onMessage(_currentMsg); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java index 1dc35b5609..98767106ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -34,7 +34,7 @@ public class ExceptionHelper { if (exception instanceof QpidException) { - jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); } else { @@ -51,7 +51,7 @@ public class ExceptionHelper static public XAException convertQpidExceptionToXAException(QpidException exception) { - String qpidErrorCode = exception.getErrorCode(); + String qpidErrorCode = String.valueOf(exception.getErrorCode()); // todo map this error to an XA code int xaCode = XAException.XAER_PROTO; return new XAException(xaCode); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index b9922de296..3869c2a793 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages - _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, + _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); if (_messageSelector != null) { @@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. @@ -591,7 +591,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - getSession().getQpidSession().messageAcquire(ranges); + getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index ad85f33aea..8d707e4ccc 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(queue.getQueueName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We do not acquire those messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index d5342fb464..5f2e7cbda7 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -384,7 +384,7 @@ public class SessionImpl implements Session _incomingAsynchronousMessages.notifyAll(); } // close the underlaying QpidSession - _qpidSession.close(); + _qpidSession.sessionClose(); } } |
