summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-03 11:34:02 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-03 11:34:02 +0000
commit6167ee934ff21684a93f43b5efcf47a85f1e4aa2 (patch)
tree72e5f58398b94c3c85aeec8d747aee796cc92264 /java
parent2b29003006c30daff0e01b0f7637f3365312d1b8 (diff)
downloadqpid-python-6167ee934ff21684a93f43b5efcf47a85f1e4aa2.tar.gz
Removed "api" from the package name
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Connection.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java)18
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java)6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java)4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java)6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Session.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/api/Session.java)294
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java145
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java29
13 files changed, 381 insertions, 232 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
index 6d5f317feb..08e067dede 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.nclient.api;
+package org.apache.qpid.nclient;
import java.net.URL;
@@ -34,18 +34,14 @@ public interface Connection
* @param url The URL of the broker.
* @throws QpidException If the communication layer fails to connect with the broker.
*/
- public void connect(URL url)
- throws
- QpidException;
+ public void connect(URL url) throws QpidException;
/**
* Close this connection.
*
* @throws QpidException if the communication layer fails to close the connection.
*/
- public void close()
- throws
- QpidException;
+ public void close() throws QpidException;
/**
@@ -57,9 +53,7 @@ public interface Connection
* @return A Newly created (suspended) session.
* @throws QpidException If the connection fails to create a session due to some internal error.
*/
- public Session createSession(int expiryInSeconds)
- throws
- QpidException;
+ public Session createSession(int expiryInSeconds) throws QpidException;
/**
* Create a DtxSession for this connection.
@@ -72,9 +66,7 @@ public interface Connection
* @return A Newly created (suspended) DtxSession.
* @throws QpidException If the connection fails to create a DtxSession due to some internal error.
*/
- public DtxSession createDTXSession(int expiryInSeconds)
- throws
- QpidException;
+ public DtxSession createDTXSession(int expiryInSeconds) throws QpidException;
/**
* If the communication layer detects a serious problem with a connection, it
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
index ac396b7c79..61f03a2d40 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.nclient.api;
+package org.apache.qpid.nclient;
import org.apache.qpidity.QpidException;
@@ -33,7 +33,5 @@ public interface DtxSession extends Session
* @throws QpidException If the session fails to retrieve its associated XA resource
* due to some error.
*/
- public javax.transaction.xa.XAResource getDTXResource()
- throws
- QpidException;
+ public javax.transaction.xa.XAResource getDTXResource() throws QpidException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java
index 1818dbfd23..d59d90fc44 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.nclient.api;
+package org.apache.qpid.nclient;
import org.apache.qpidity.QpidException;
@@ -31,7 +31,7 @@ public interface ExceptionListener
* informs the connection's ExceptionListener
*
* @param exception The exception comming from the communication layer.
- * @see org.apache.qpid.nclient.api.Connection
+ * @see Connection
*/
public void onException(QpidException exception);
} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
index 5fcf0d9e0d..93b770a285 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.nclient.api;
+package org.apache.qpid.nclient;
import org.apache.qpidity.api.Message;
@@ -26,7 +26,7 @@ import org.apache.qpidity.api.Message;
public interface MessageListener
{
/**
- * <p>Deliver a message to the listener.
+ * <p>Transfer a message to the listener.
* You will be notified when the whole message is received
* However, underneath the message might be streamed off disk
* or network buffers.
@@ -34,5 +34,5 @@ public interface MessageListener
*
* @param message The message delivered to the listner.
*/
- public void onMessage(Message message);
+ public void messageTransfer(Message message);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
index 07938a5985..9a2b5e63bf 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
@@ -16,16 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.nclient.api;
+package org.apache.qpid.nclient;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.Header;
+import org.apache.qpidity.Range;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.api.StreamingMessageListener;
/**
* <p>A session is associated with a connection.
@@ -36,210 +35,220 @@ public interface Session
{
//------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
-
+ // Session housekeeping methods
+ //------------------------------------------------------
/**
* Close this session and any associated resources.
*
* @throws QpidException If the communication layer fails to close this session or if an internal error happens
* when closing this session resources. .
*/
- public void sessionClose() throws QpidException;
+ public void close() throws QpidException;
/**
* Suspend this session resulting in interrupting the traffic with the broker.
- * An important distinction btw sessionFlow() and this method
- * is that the session timer will start to tick in suspend.
- * When a session is suspend any operation of this session and of the associated resources are unavailable.
+ * <p> The session timer will start to tick in suspend.
+ * <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
*
* @throws QpidException If the communication layer fails to suspend this session
*/
- public void sessionSuspend() throws QpidException;
-
-
- /**
- * This will stop the communication flow in the session
- * However the session is still considered active and the session timer will not tick.
- * This method is used for session level flow control.
- *
- * @throws QpidException If the communication layer fails to execute the flow method properly
- */
- public void sessionFlow(Option... options) throws QpidException;
+ public void suspend() throws QpidException;
/**
- * This is used for failover. This will resume an existing session
+ * This will resume an existing session
+ * <p> Upon resume the session is attached with an underlying channel
+ * hence making operation on this session available.
*
* @throws QpidException If the communication layer fails to execute this properly
*/
- public void sessionResume(UUID sessionId) throws QpidException;
+ public void resume() throws QpidException;
-
//------------------------------------------------------
- // Messaging methods
+ // Messaging methods
// Producer
- //------------------------------------------------------
-
- /**
- * Transfer the given message.
- * This is a convinience method
- *
- * @param destination The exchange the message being sent.
- * @return msg The Message to be sent
- * @throws QpidException If the session fails to send the message due to some error
- */
- public void messageTransfer(String destination,Message msg,Option... options)throws QpidException;
-
+ //------------------------------------------------------
/**
- * Transfer the given message.
+ * Transfer the given message to a specified exchange.
* <p> Following are the valid options for messageTransfer
* <ul>
* <li> CONFIRM
* <li> PRE_ACCQUIRE
* </ul>
- * </p>
- *
* <p> In the absence of a particular option, the defaul value is:
* <ul>
* <li> CONFIRM = false
* <li> NO-ACCQUIRE
* </ul>
- * </p>
*
- * @param destination The exchange the message being sent.
- * @return options set of options
+ * @param exchange The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param options A list of valid options
* @throws QpidException If the session fails to send the message due to some error
*/
- public void messageTransfer(String destination,Option... options)throws QpidException;
-
+ public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException;
+
/**
- * Add the following headers to content bearing frame
+ * Declare the beginning of a message transfer operation. This operation must
+ * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}.
+ * The transfer is ended by endData.
+ * <p> This way of transferring messages is useful when streaming large messages
+ * <p> In the interval [messageTransfer endData] any attempt to call a method other than
+ * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close}
+ * will result in an exception being thrown.
+ * <p> Following are the valid options for messageTransfer
+ * <ul>
+ * <li> CONFIRM
+ * <li> PRE_ACCQUIRE
+ * </ul>
+ * <p> In the absence of a particular option, the defaul value is:
+ * <ul>
+ * <li> CONFIRM = false
+ * <li> NO-ACCQUIRE
+ * </ul>
*
- * @param Header Either DeliveryProperties or ApplicationProperties
- * @throws QpidException If the session fails to execute the method due to some error
+ * @param exchange The exchange the message is being sent.
+ * @param options Set of options.
+ * @throws QpidException If the session fails to send the message due to some error.
*/
- public void messageHeaders(Header ... headers)throws QpidException;
-
+ public void messageTransfer(String exchange, Option... options) throws QpidException;
+
/**
- * Add the following byte array to the content.
- * This method is useful when streaming large messages
+ * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+ * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent.
*
- * @param src data to be added or streamed
+ * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @throws QpidException If the session fails to execute the method due to some error
+ * @see org.apache.qpidity.DeliveryProperties
+ * @see org.apache.qpidity.ApplicationProperties
*/
- public void data(byte[] src)throws QpidException;
-
- /**
- * Signals the end of data for the message. *
- * This method is useful when streaming large messages
- *
- * @throws QpidException If the session fails to execute the method due to some error
- */
- public void endData()throws QpidException;
+ public void addMessageHeaders(Header... headers) throws QpidException;
/**
- * Acknowledge the receipt of this message.
- * <p>The message must have been previously acquired either by receiving it in
- * pre-acquire mode or by explicitly acquiring it.
+ * Add the following byte array to the content of the message being sent.
*
- * @throws QpidException If the acknowledgement of the message fails due to some error.
- * @throws IllegalStateException If this messages is not acquired.
+ * @param data Data to be added.
+ * @param off Offset from which to start reading data
+ * @param len Number of bytes to be read
+ * @throws QpidException If the session fails to execute the method due to some error
*/
- public void messageAcknowledge() throws QpidException;
+ public void addData(byte[] data, int off, int len) throws QpidException;
/**
- * Reject a previously acquired message.
- * <p> A rejected message will not be delivered to any receiver
- * and may be either discarded or moved to the broker dead letter queue.
+ * Signals the end of data for the message.
*
- * @throws QpidException If this message cannot be rejected dus to some error
- * @throws IllegalStateException If this message is not acquired.
+ * @throws QpidException If the session fails to execute the method due to some error
*/
- public void messageReject() throws QpidException;
+ public void endData() throws QpidException;
- /**
- * Try to acquire this message hence releasing it form the queue. This means that once acknowledged,
- * this message will not be delivered to any other receiver.
- * <p> As this message may have been consumed by another receiver, message acquisition can fail.
- * The outcome of the acquisition is returned as a Boolean.
- *
- * @return True if the message is successfully acquired, False otherwise.
- * @throws QpidException If this message cannot be acquired dus to some error
- * @throws IllegalStateException If this message has already been acquired.
- */
- public boolean messageAcquire() throws QpidException;
+ //------------------------------------------------------
+ // Messaging methods
+ // Consumer
+ //------------------------------------------------------
/**
- * Give up responsibility for processing this message.
- *
- * @throws QpidException If this message cannot be released dus to some error.
- * @throws IllegalStateException If this message has already been acknowledged.
- */
- public void messageRelease() throws QpidException;
-
-
- //------------------------------------------------------
- // Messaging methods
- // Consumer
- //------------------------------------------------------
-
- /**
- * Create a message receiver for receiving messages from queue queueName.
- * <p> Following are the valid options for messageSubscribe
+ * Associate a message listener with a destination.
+ * <p> The destination is bound to a queue and messages are filtered based
+ * on the provider filter map (message filtering is specific to the provider and may not be handled).
+ * <p/>
+ * <p> Following are the valid options
* <ul>
* <li> NO_LOCAL
* <li> EXCLUSIVE
* <li> NO_ACQUIRE
* <li> CONFIRM
* </ul>
- * </p>
- *
- * <p> In the absence of a particular option, the defaul value is:
+ * <p> In the absence of a particular option, defaul values are:
* <ul>
* <li> NO_LOCAL = false
* <li> EXCLUSIVE = false
* <li> PRE-ACCQUIRE
* <li> CONFIRM = false
* </ul>
- * </p>
*
- * @param queue The queue this receiver is receiving messages from.
+ * @param queue The queue this receiver is receiving messages from.
* @param destination The destination for the subscriber ,a.k.a the delivery tag.
- * @param options Set of Options.
+ * @param listener The listener for this destination. When big message are transfered then
+ * it is recommended to use a {@link StreamingMessageListener}.
+ * @param options Set of Options.
+ * @param filter The filters to apply to consumed messages.
* @throws QpidException If the session fails to create the receiver due to some error.
- * @see Option
*/
- public void messageSubscribe(String queue, String destination, Map<String,?> filter, Option ... _options) throws QpidException;
-
- public void messageSubscribe(String queue, String destination, Map<String,?> filter,StreamingMessageListener listener,Option ... _options) throws QpidException;
+ public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ Option... options) throws QpidException;
/**
- * Cancels a subscription
- *
+ * Cancels a subscription with a ginven destination.
+ *
* @param destination The destination for the subscriber used at subscription
+ * @throws QpidException If cancelling the subscription fails due to some error.
*/
public void messageCancel(String destination) throws QpidException;
-
+
/**
- * We currently allow one listerner per destination
- *
- * @param destination
- * @param listener
+ * Associate a message listener with a destination.
+ * We currently allow one listerner per destination this means
+ * that the previous message listener is replaced. This is done gracefully i.e. the message
+ * listener is replaced once it return from the processing of a message.
+ *
+ * @param destination The destination the listener is associated with.
+ * @param listener The new listener for this destination. When big message are transfered then
+ * it is recommended to use a {@link StreamingMessageListener}.
*/
- public void setMessageListener(String destination,StreamingMessageListener listener);
-
+ public void setMessageListener(String destination, MessageListener listener);
+
+
/**
- * We currently allow one listerner per destination
- *
- * @param destination
- * @param listener
+ * Acknowledge the receipt of ranges of messages.
+ * <p>Message must have been previously acquired either by receiving them in
+ * pre-acquire mode or by explicitly acquiring them.
+ *
+ * @param range Range of acknowledged messages.
+ * @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void setMessageListener(String destination,MessageListener listener);
-
-
+ public void messageAcknowledge(Range... range) throws QpidException;
+
+ /**
+ * Reject ranges of acquired messages.
+ * <p> A rejected message will not be delivered to any receiver
+ * and may be either discarded or moved to the broker dead letter queue.
+ *
+ * @param range Range of rejected messages.
+ * @throws QpidException If those messages cannot be rejected dus to some error
+ */
+ public void messageReject(Range... range) throws QpidException;
+
+ /**
+ * Try to acquire ranges of messages hence releasing them form the queue.
+ * This means that once acknowledged, a message will not be delivered to any other receiver.
+ * <p> As those messages may have been consumed by another receivers hence,
+ * message acquisition can fail.
+ * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
+ * <p> This method should only be called on non-acquired messages.
+ *
+ * @param range Ranges of messages to be acquired.
+ * @return Ranges of explicitly acquired messages.
+ * @throws QpidException If this message cannot be acquired dus to some error
+ */
+ public Range[] messageAcquire(Range... range) throws QpidException;
+
+ /**
+ * Give up responsibility for processing ranges of messages.
+ * <p> Released messages are re-enqueued.
+ *
+ * @param range Ranges of messages to be released.
+ * @throws QpidException If this message cannot be released dus to some error.
+ */
+ public void messageRelease(Range... range) throws QpidException;
+
// -----------------------------------------------
- // Transaction methods
+ // Local transaction methods
// ----------------------------------------------
+ /**
+ * Selects the session for local transaction support.
+ *
+ * @throws QpidException If selecting this session for local transaction support fails due to some error.
+ */
+ public void txSelect() throws QpidException;
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
@@ -256,19 +265,11 @@ public interface Session
* @throws IllegalStateException If this session is not transacted.
*/
public void txRollback() throws QpidException, IllegalStateException;
-
-
- /**
- * Selects the session for transactions
- *
- * @throws QpidException
- */
- public void txSelect() throws QpidException;
-
+
//---------------------------------------------
// Queue methods
//---------------------------------------------
-
+
/**
* Declare a queue with the given queueName
* <p> Following are the valid options for declareQueue
@@ -283,14 +284,14 @@ public interface Session
* <p/>
* <p>In the absence of a particular option, the defaul value is false for each option
*
- * @param queueName The name of the delcared queue.
- * @param options Set of Options.
+ * @param queueName The name of the delcared queue.
+ * @param alternateExchange Alternate excahnge.
+ * @param options Set of Options.
* @throws QpidException If the session fails to declare the queue due to some error.
* @see Option
*/
public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
Option... options) throws QpidException;
- //Todo: Do we need to define more specific exceptions like queue name already exist?
/**
* Bind a queue with an exchange.
@@ -302,7 +303,6 @@ public interface Session
*/
public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
QpidException;
- //Todo: Do we need to define more specific exceptions like exchange does not exist?
/**
* Unbind a queue from an exchange.
@@ -314,11 +314,9 @@ public interface Session
*/
public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
QpidException;
- //Todo: Do we need to define more specific exceptions like exchange does not exist?
/**
* Purge a queue. i.e. delete all enqueued messages
- * TODO: Define the exact semantic i.e. are message sent to a dead letter queue?
*
* @param queueName The queue to be purged
* @throws QpidException If the session fails to purge the queue due to some error.
@@ -346,11 +344,10 @@ public interface Session
*/
public void queueDelete(String queueName, Option... options) throws QpidException;
-
- // --------------------------------------
+ // --------------------------------------
// exhcange methods
// --------------------------------------
-
+
/**
* Declare an exchange.
* <p> Following are the valid options for createReceive
@@ -373,7 +370,7 @@ public interface Session
*/
public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
Map<String, ?> arguments, Option... options) throws QpidException;
-
+
/**
* Delete an exchange.
* <p> Following are the valid options for createReceive
@@ -392,5 +389,4 @@ public interface Session
* @see Option
*/
public void exchangeDelete(String exchangeName, Option... options) throws QpidException;
- //Todo: Do we need to define more specific exceptions like exchange does not exist?
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java
new file mode 100644
index 0000000000..20a8319409
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpidity.Header;
+
+/**
+ * <p>This message listener is useful if you need to
+ * know when each message part becomes available
+ * as opposed to knowing when the whole message arrives.</p>
+ * <p/>
+ * <p> The sequence of event for transferring a message is as follows:
+ * <ul>
+ * <li> n calls to addMessageHeaders (should be usually one or two)
+ * <li> n calls to addData
+ * <li> {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(<code>null</code>).
+ * </ul>
+ * This is up to the implementation to assembled the message when the different parts
+ * are transferred.
+ */
+public interface StreamingMessageListener extends MessageListener
+{
+ /**
+ * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+ * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
+ *
+ * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ */
+ public void addMessageHeaders(Header... headers);
+
+ /**
+ * Add the following byte array to the content of the message being received
+ *
+ * @param data Data to be added or streamed.
+ */
+ public void addData(byte[] data);
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index c9e5d03948..099f7ed694 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -2,28 +2,149 @@ package org.apache.qpid.nclient.impl;
import java.util.Map;
-import org.apache.qpidity.api.StreamingMessageListener;
-import org.apache.qpid.nclient.api.MessageListener;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Session;
+import org.apache.qpidity.api.Message;
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpidity.*;
-public class ClientSession extends Session implements org.apache.qpid.nclient.api.Session
+/**
+ * Implements a Qpid Sesion.
+ */
+public class ClientSession implements org.apache.qpid.nclient.Session
{
- public void setMessageListener(String destination,MessageListener listener)
+ //------------------------------------------------------
+ // Session housekeeping methods
+ //------------------------------------------------------
+ public void close() throws QpidException
{
- super.setMessageListener(destination, new StreamingListenerAdapter(listener));
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void messageSubscribe(String queue, String destination, Map<String, ?> filter, Option... _options) throws QpidException
+ public void suspend() throws QpidException
{
- // TODO
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void messageSubscribe(String queue, String destination, Map<String, ?> filter, StreamingMessageListener listener, Option... _options) throws QpidException
+ public void resume() throws QpidException
{
- // TODO
+ //To change body of implemented methods use File | Settings | File Templates.
+ }//------------------------------------------------------
+ // Messaging methods
+ // Producer
+ //------------------------------------------------------
+ public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageTransfer(String exchange, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addMessageHeaders(Header... headers) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addData(byte[] data, int off, int len) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void endData() throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageCancel(String destination) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setMessageListener(String destination, MessageListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageAcknowledge(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageReject(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Range[] messageAcquire(Range... range) throws QpidException
+ {
+ return new Range[0]; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void messageRelease(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }// -----------------------------------------------
+ // Local transaction methods
+ // ----------------------------------------------
+ public void txSelect() throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void txCommit() throws QpidException, IllegalStateException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void txRollback() throws QpidException, IllegalStateException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
+ Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queuePurge(String queueName) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueDelete(String queueName, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
+ Map<String, ?> arguments, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void exchangeDelete(String exchangeName, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
index b8f04ca0df..ff0ac63540 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
@@ -1,10 +1,10 @@
package org.apache.qpid.nclient.impl;
-import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.StreamingMessageListener;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.api.StreamingMessageListener;
public class StreamingListenerAdapter implements StreamingMessageListener
{
@@ -16,23 +16,18 @@ public class StreamingListenerAdapter implements StreamingMessageListener
_adaptee = l;
}
- public void data(byte[] src)
+ public void addData(byte[] src)
{
_currentMsg.appendData(src);
}
- public void endData()
- {
- _adaptee.onMessage(_currentMsg);
- }
-
- public void messageHeaders(Header... headers)
+ public void addMessageHeaders(Header... headers)
{
//_currentMsg add the headers
}
- public void messageTransfer(String destination, Option... options)
+ public void messageTransfer(Message message)
{
- // _currentMsg create message from factory
+ _adaptee.messageTransfer(_currentMsg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
index da06b09f6d..7cc7659139 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
@@ -25,6 +25,7 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Session;
import javax.jms.ExceptionListener;
+import javax.jms.Connection;
import java.util.Vector;
@@ -85,7 +86,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
/**
* The QpidConeection instance that is mapped with thie JMS connection
*/
- org.apache.qpid.nclient.api.Connection _qpidConnection;
+ org.apache.qpid.nclient.Connection _qpidConnection;
/**
* This is the exception listener for this qpid connection.
@@ -436,7 +437,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @return This JMS connection underlying Qpid Connection.
*/
- protected org.apache.qpid.nclient.api.Connection getQpidConnection()
+ protected org.apache.qpid.nclient.Connection getQpidConnection()
{
return _qpidConnection;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
index e9bdef540c..95a59ecfd3 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
@@ -60,7 +60,6 @@ public abstract class MessageActor
protected MessageActor(SessionImpl session, DestinationImpl destination)
{
- // TODO create the qpidResource _qpidResource =
_session = session;
_destination = destination;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
index a350ceecee..3689eb60b0 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
@@ -19,6 +19,7 @@ package org.apache.qpid.nclient.jms;
//import org.apache.qpid.nclient.api.MessageReceiver;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -30,10 +31,6 @@ import javax.jms.Message;
*/
public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
- /**
- * The underlying qpid receiver
- */
- /* private MessageReceiver _qpidReceiver;*/
/**
* This MessageConsumer's messageselector.
@@ -55,7 +52,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
/**
* A MessageListener set up for this consumer.
*/
- private MessageListener _messageListener = null;
+ private MessageListener _messageListener;
+
+ /**
+ * A warpper around the JSM message listener
+ */
+ private MessageListenerWrapper _messageListenerWrapper;
//----- Constructors
/**
@@ -79,7 +81,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
/*try
{
// TODO define the relevant options
- _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), null);
+ _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE);
+ _qpidResource = _qpidReceiver;
}
catch (QpidException e)
{
@@ -131,7 +134,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
public void setMessageListener(MessageListener messageListener) throws JMSException
{
checkNotClosed();
- // TODO: create a message listener wrapper
+ _messageListener = messageListener;
+ if( messageListener == null )
+ {
+
+ _messageListenerWrapper = null;
+ }
+ else
+ {
+ _messageListenerWrapper = new MessageListenerWrapper(this);
+ //TODO _qpidReceiver.setAsynchronous(_messageListenerWrapper);
+ }
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
index 358f50506a..4dde127337 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
@@ -17,15 +17,13 @@
*/
package org.apache.qpid.nclient.jms;
-import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpid.nclient.MessageListener;
import org.apache.qpid.nclient.jms.message.AbstractJMSMessage;
import org.apache.qpid.nclient.jms.message.QpidMessage;
import org.apache.qpidity.api.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-
/**
* This is a wrapper for the JMS message listener
*/
@@ -57,13 +55,13 @@ public class MessageListenerWrapper implements MessageListener
_consumer = consumer;
}
- //---- org.apache.qpid.nclient.api.MessageListener API
+ //---- org.apache.qpid.nclient.MessageListener API
/**
* Deliver a message to the listener.
*
* @param message The message delivered to the listner.
*/
- public void onMessage(Message message)
+ public void messageTransfer(Message message)
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index 84ee7723fd..d0c6569197 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.nclient.jms.message.*;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -72,7 +71,7 @@ public class SessionImpl implements Session
/**
* The underlying QpidSession
*/
- private org.apache.qpid.nclient.api.Session _qpidSession;
+ private org.apache.qpid.nclient.Session _qpidSession;
/**
* Indicates whether this session is recovering
@@ -337,7 +336,7 @@ public class SessionImpl implements Session
// close the underlaying QpidSession
try
{
- _qpidSession.sessionClose();
+ _qpidSession.close();
}
catch (org.apache.qpidity.QpidException e)
{
@@ -463,7 +462,6 @@ public class SessionImpl implements Session
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
-
return createConsumer(destination, messageSelector, false);
}
@@ -664,11 +662,12 @@ public class SessionImpl implements Session
/**
* Remove a message actor form this session
* <p> This method is called when an actor is independently closed.
+ *
* @param actor The closed actor.
*/
protected void closeMessageActor(MessageActor actor)
{
- _messageActors.remove(actor);
+ _messageActors.remove(actor);
}
/**
@@ -678,15 +677,7 @@ public class SessionImpl implements Session
*/
protected void start() throws JMSException
{
- try
- {
- // TODO: make sure that the correct options are used
- _qpidSession.sessionFlow(Option.SUSPEND);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ // TODO: make sure that the correct options are used
}
/**
@@ -696,15 +687,7 @@ public class SessionImpl implements Session
*/
protected void stop() throws JMSException
{
- try
- {
// TODO: make sure that the correct options are used
- _qpidSession.sessionFlow(Option.RESUME);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
}
/**
@@ -818,7 +801,7 @@ public class SessionImpl implements Session
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.api.Session getQpidSession()
+ protected org.apache.qpid.nclient.Session getQpidSession()
{
return _qpidSession;
}