summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Connection.java13
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java60
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/Client.java112
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java229
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java98
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java98
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java2
12 files changed, 434 insertions, 251 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
index 3da3a12aa5..2ea6db8943 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
@@ -28,6 +28,17 @@ import org.apache.qpidity.QpidException;
*/
public interface Connection
{
+ /**
+ * Establish the connection using the given parameters
+ *
+ * @param host
+ * @param port
+ * @param username
+ * @param password
+ * @throws QpidException
+ */
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
+
/**
* Establish the connection with the broker identified by the provided URL.
*
@@ -52,7 +63,7 @@ public interface Connection
* @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
* @return A Newly created (suspended) session.
*/
- public Session createSession(int expiryInSeconds);
+ public Session createSession(long expiryInSeconds);
/**
* Create a DtxSession for this connection.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index 633c77ecac..def7b5ca41 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -17,7 +17,9 @@
*/
package org.apache.qpidity.client;
-import org.apache.qpidity.Header;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.Struct;
/**
* Assembles message parts.
@@ -31,21 +33,21 @@ import org.apache.qpidity.Header;
* are transferred.
*/
public interface MessagePartListener
-{
+{
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void messageHeaders(Header... headers);
+ public void messageHeaders(Struct... headers);
/**
* Add the following byte array to the content of the message being received
*
* @param data Data to be added or streamed.
*/
- public void addData(byte[] data);
+ public void addData(ByteBuffer src);
/**
* Indicates that the message has been fully received.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 0178ef6d3b..84de268232 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -18,12 +18,14 @@
*/
package org.apache.qpidity.client;
+import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.UUID;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
/**
* <p>A session is associated with a connection.
@@ -32,10 +34,12 @@ import org.apache.qpidity.RangeSet;
*/
public interface Session
{
- public static final short ACQUIRE_MODE_NO_ACQUIRE = 0;
- public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1;
- public static final short CONFIRM_MODE_REQUIRED = 1;
- public static final short CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+ public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+ public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
@@ -54,21 +58,21 @@ public interface Session
/**
* Close this session and any associated resources.
*/
- public void close();
+ public void sessionClose();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
*/
- public void suspend();
+ public void sessionSuspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
*/
- public void resume();
+ public void sessionResume(UUID sessionId);
//------------------------------------------------------
// Messaging methods
@@ -92,7 +96,7 @@ public interface Session
* @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -117,27 +121,39 @@ public interface Session
* </ul>
* @param exchange The exchange the message is being sent.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
- * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ * @param headers are Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers);
+ public void headers(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
*
* @param data Data to be added.
- * @param off Offset from which to start reading data
- * @param len Number of bytes to be read
*/
- public void addData(byte[] data, int off, int len);
+ public void data(byte[] data);
+
+ /**
+ * Add the following ByteBuffer to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(ByteBuffer buf);
/**
+ * Add the following String to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(String str);
+
+ /**
* Signals the end of data for the message.
*/
public void endData();
@@ -258,8 +274,6 @@ public interface Session
* @param destination The destination to call flush on.
*/
public void messageFlush(String destination);
-
- public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -286,8 +300,12 @@ public interface Session
* and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
+ * @param code TODO
+ * @param text TODO
*/
- public void messageReject(RangeSet ranges);
+ public void messageReject(RangeSet ranges, int code, String text);
+
+ public RangeSet getRejectedMessages();
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -296,10 +314,10 @@ public interface Session
* message acquisition can fail.
* The outcome of the acquisition is returned as an array of ranges of qcquired messages.
* <p> This method should only be called on non-acquired messages.
- *
+ * @param mode TODO
* @param range Ranges of messages to be acquired.
*/
- public void messageAcquire(RangeSet ranges);
+ public void messageAcquire(RangeSet ranges, short mode);
public RangeSet getAccquiredMessages();
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java
new file mode 100644
index 0000000000..13acff1ea6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/impl/Client.java
@@ -0,0 +1,112 @@
+package org.apache.qpidity.impl;
+
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.Channel;
+import org.apache.qpidity.Connection;
+import org.apache.qpidity.ConnectionClose;
+import org.apache.qpidity.ConnectionDelegate;
+import org.apache.qpidity.MinaHandler;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.client.DtxSession;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.Session;
+
+
+public class Client implements org.apache.qpidity.client.Connection
+{
+ private AtomicInteger _channelNo = new AtomicInteger();
+ private Connection _conn;
+ private ExceptionListener _exceptionListner;
+ private final Lock _lock = new ReentrantLock();
+
+ public static org.apache.qpidity.client.Connection createConnection()
+ {
+ return new Client();
+ }
+
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
+ {
+ Condition negotiationComplete = _lock.newCondition();
+ _lock.lock();
+
+ ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ClientSessionDelegate();
+ }
+
+ @Override public void connectionClose(Channel context, ConnectionClose struct)
+ {
+ _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null));
+ }
+ };
+
+ connectionDelegate.setCondition(_lock,negotiationComplete);
+ connectionDelegate.setUsername(username);
+ connectionDelegate.setPassword(password);
+ connectionDelegate.setVirtualHost(virtualHost);
+
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+
+ _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
+
+ try
+ {
+ negotiationComplete.await();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /*
+ * Until the dust settles with the URL disucssion
+ * I am not going to implement this.
+ */
+ public void connect(URL url) throws QpidException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws QpidException
+ {
+ Channel ch = _conn.getChannel(0);
+ ch.connectionClose(0, "client is closing", 0, 0);
+ //need to close the connection underneath as well
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ ClientSession ssn = new ClientSession();
+ ssn.attach(ch);
+ ssn.sessionOpen(expiryInSeconds);
+
+ return ssn;
+ }
+
+ public DtxSession createDTXSession(int expiryInSeconds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
index a0111a86f4..627829556c 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
@@ -1,216 +1,101 @@
package org.apache.qpidity.impl;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
-import org.apache.qpidity.*;
/**
* Implements a Qpid Sesion.
*/
-public class ClientSession implements org.apache.qpidity.client.Session
+public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session
{
-
- Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
-
- public void addData(byte[] data, int off, int len)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void addMessageHeaders(Header... headers)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void close()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void endData()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void exchangeDelete(String exchangeName, Option... options)
+ private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
+ private ExceptionListener _exceptionListner;
+ private RangeSet _acquiredMessages;
+ private RangeSet _rejectedMessages;
+ private Map<String,List<RangeSet>> _unackedMessages = new HashMap<String,List<RangeSet>>();
+
+ @Override public void sessionClose()
{
- // TODO Auto-generated method stub
-
+ // release all unacked messages and then issues a close
+ super.sessionClose();
}
-
+
public void messageAcknowledge(RangeSet ranges)
{
- // TODO Auto-generated method stub
-
- }
-
- public void messageAcquire(RangeSet ranges)
- {
- // TODO Auto-generated method stub
- }
-
- public void messageCancel(String destination)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlow(String destination, short unit, long value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlowMode(String destination, short mode)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlush(String destination)
- {
- // TODO Auto-generated method stub
- }
-
- public void messageReject(RangeSet ranges)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageRelease(RangeSet ranges)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageStop(String destination)
- {
- // TODO Auto-generated method stub
-
+ for (Range range : ranges)
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("Acknowleding message for : " + super.getCommand((int) l));
+ super.processed(l);
+ }
+ }
}
public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
{
- // TODO Auto-generated method stub
-
+ setMessageListener(destination,listener);
+ super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
}
public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode)
{
- // TODO Auto-generated method stub
-
+ // need to break it down into small pieces
+ super.messageTransfer(exchange, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ // super.data(bytes); *
+ // super.endData()
}
-
- public void messageTransfer(String exchange, short confirmMode, short acquireMode)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueDelete(String queueName, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queuePurge(String queueName)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void resume()
+
+
+ public RangeSet getAccquiredMessages()
{
- // TODO Auto-generated method stub
-
+ return _acquiredMessages;
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public RangeSet getRejectedMessages()
{
- // TODO Auto-generated method stub
-
+ return _rejectedMessages;
}
-
+
public void setMessageListener(String destination, MessagePartListener listener)
{
- // TODO Auto-generated method stub
-
+ _messageListeners.put(destination, listener);
}
-
- public void suspend()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void sync()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void txCommit() throws IllegalStateException
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
{
- // TODO Auto-generated method stub
-
- }
-
- public void txRollback() throws IllegalStateException
+ _exceptionListner = exceptionListner;
+ }
+
+ // ugly but nessacery
+
+ void setAccquiredMessages(RangeSet acquiredMessages)
{
- // TODO Auto-generated method stub
-
+ _acquiredMessages = acquiredMessages;
}
-
- public void txSelect()
+
+ void setRejectedMessages(RangeSet rejectedMessages)
{
- // TODO Auto-generated method stub
-
+ _rejectedMessages = rejectedMessages;
}
-
- public RangeSet getAccquiredMessages()
+
+ void notifyException(QpidException ex)
{
- // TODO Auto-generated method stub
- return null;
+ _exceptionListner.onException(ex);
}
-
- public int getNoOfUnAckedMessages()
+
+ Map<String,MessagePartListener> getMessageListerners()
{
- // TODO Auto-generated method stub
- return 0;
+ return _messageListeners;
}
-
-
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
index ea9e4c067b..70d1019a06 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
@@ -1,45 +1,79 @@
package org.apache.qpidity.impl;
-import org.apache.qpidity.CommonSessionDelegate;
-import org.apache.qpidity.client.Session;
+import java.nio.ByteBuffer;
+import org.apache.qpidity.Frame;
+import org.apache.qpidity.MessageAcquired;
+import org.apache.qpidity.MessageReject;
+import org.apache.qpidity.MessageTransfer;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Session;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.client.MessagePartListener;
-public class ClientSessionDelegate extends CommonSessionDelegate
-{
-
- /*@Override public void messageTransfer(Session context, MessageTransfer struct)
- {
- MessagePartListener l = context.messagListeners.get(struct.getDestination());
- l.messageTransfer(struct.getDestination(),new Option[0]);
- }*/
-
- // ---------------------------------------------------------------
- // Non generated methods - but would like if they are also generated.
- // These methods should be called from Body and Header Handlers.
- // If these methods are generated as part of the delegate then
- // I can call these methods from the BodyHandler and HeaderHandler
- // in a generic way
- // I have used destination to indicate my intent of receiving
- // some form of correlation to know which consumer this data belongs to.
- // It can be anything as long as I can make the right correlation
- // ----------------------------------------------------------------
- /* public void data(Session context,String destination,byte[] src) throws QpidException
+public class ClientSessionDelegate extends SessionDelegate
+{
+ private MessageTransfer _currentTransfer;
+ private MessagePartListener _currentMessageListener;
+
+ @Override public void data(Session ssn, Frame frame)
{
- MessagePartListener l = context.messagListeners.get(destination);
- l.data(src);
+ for (ByteBuffer b : frame)
+ {
+ _currentMessageListener.addData(b);
+ }
+ if (frame.isLastSegment() && frame.isLastFrame())
+ {
+ _currentMessageListener.messageReceived();
+ }
+
}
- public void endData(Session context,String destination) throws QpidException
+ @Override public void headers(Session ssn, Struct... headers)
{
- MessagePartListener l = context.messagListeners.get(destination);
- l.endData();
+ _currentMessageListener.messageHeaders(headers);
}
- public void messageHeaders(Session context,String destination,Header... headers) throws QpidException
- {
- MessagePartListener l = context.messagListeners.get(destination);
- l.endData();
- }*/
+ @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
+ {
+ _currentTransfer = currentTransfer;
+ _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+
+ //a better way is to tell the broker to stop the transfer
+ if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
+ {
+ RangeSet transfers = new RangeSet();
+ transfers.add(_currentTransfer.getId());
+ session.messageRelease(transfers);
+ }
+ }
+
+ // --------------------------------------------
+ // Message methods
+ // --------------------------------------------
+
+
+ @Override public void messageReject(Session session, MessageReject struct)
+ {
+ for (Range range : struct.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ session.getCommand((int) l));
+ }
+ }
+ ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+ ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null));
+ }
+
+ @Override public void messageAcquired(Session session, MessageAcquired struct)
+ {
+ ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
new file mode 100644
index 0000000000..d325054bee
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
@@ -0,0 +1,45 @@
+package org.apache.qpidity.impl;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.Connection;
+
+public class DemoClient
+{
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("Queue1", null, null);
+ ssn.sync();
+
+ ssn.messageTransfer("Queue1", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties(),
+ new MessageProperties());
+ ssn.data("this is the data");
+ ssn.endData();
+
+ ssn.messageTransfer("Queue2", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
index f05f2c0e76..5d3f6a6e3e 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
@@ -1,33 +1,109 @@
package org.apache.qpidity.impl;
-import org.apache.qpidity.Header;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.Struct;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
+/**
+ *
+ * Will call onMessage method as soon as data is avialable
+ * The client can then start to process the data while
+ * the rest of the data is read.
+ *
+ */
public class MessagePartListenerAdapter implements MessagePartListener
{
MessageListener _adaptee;
Message _currentMsg;
+ DeliveryProperties _currentDeliveryProps;
+ MessageProperties _currentMessageProps;
public MessagePartListenerAdapter(MessageListener listener)
{
_adaptee = listener;
- _currentMsg = null;
- }
+
+ // temp solution.
+ _currentMsg = new Message()
+ {
+ Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ ByteBuffer _readBuffer;
+ private int dataSize;
+
+ public void appendData(byte[] src)
+ {
+ appendData(ByteBuffer.wrap(src));
+ }
- public void addData(byte[] src)
- {
- _currentMsg.appendData(src);
- }
+ public void appendData(ByteBuffer src)
+ {
+ _data.offer(src);
+ dataSize += src.remaining();
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _currentDeliveryProps;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _currentMessageProps;
+ }
- public void messageHeaders(Header... headers)
+ // since we provide the message only after completion
+ // we can assume that when this method is called we have
+ // received all data.
+ public void readData(byte[] target)
+ {
+ if (_readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ _readBuffer.get(target);
+ }
+
+ private void buildReadBuffer()
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
+ }
+ };
+ }
+
+ public void addData(ByteBuffer src)
+ {
+ _currentMsg.appendData(src);
+ }
+
+ public void messageHeaders(Struct... headers)
{
- //_currentMsg add the headers
+ for(Struct struct: headers)
+ {
+ if(struct instanceof DeliveryProperties)
+ {
+ _currentDeliveryProps = (DeliveryProperties)struct;
+ }
+ else if (struct instanceof MessageProperties)
+ {
+ _currentMessageProps = (MessageProperties)struct;
+ }
+ }
}
-
+
public void messageReceived()
{
- _adaptee.onMessage(_currentMsg);
+ _adaptee.onMessage(_currentMsg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
index 1dc35b5609..98767106ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
@@ -34,7 +34,7 @@ public class ExceptionHelper
{
if (exception instanceof QpidException)
{
- jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+ jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode()));
}
else
{
@@ -51,7 +51,7 @@ public class ExceptionHelper
static public XAException convertQpidExceptionToXAException(QpidException exception)
{
- String qpidErrorCode = exception.getErrorCode();
+ String qpidErrorCode = String.valueOf(exception.getErrorCode());
// todo map this error to an XA code
int xaCode = XAException.XAER_PROTO;
return new XAException(xaCode);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index b9922de296..3869c2a793 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// this is a queue we expect that this queue exists
getSession().getQpidSession()
.messageSubscribe(destination.getName(), getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// When the message selctor is set we do not acquire the messages
- _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE,
+ _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
if (_messageSelector != null)
{
@@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We always acquire the messages
- org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
_noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
// Request exclusive subscription access, meaning only this subscription
// can access the queue.
@@ -591,7 +591,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- getSession().getQpidSession().messageAcquire(ranges);
+ getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE);
RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index ad85f33aea..8d707e4ccc 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
// this is a queue we expect that this queue exists
getSession().getQpidSession()
.messageSubscribe(queue.getQueueName(), getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We do not acquire those messages
- org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index d5342fb464..5f2e7cbda7 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -384,7 +384,7 @@ public class SessionImpl implements Session
_incomingAsynchronousMessages.notifyAll();
}
// close the underlaying QpidSession
- _qpidSession.close();
+ _qpidSession.sessionClose();
}
}