summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Message.java118
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Session.java201
7 files changed, 161 insertions, 268 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
index 6d5f317feb..a2609bc6ff 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
@@ -76,11 +76,4 @@ public interface Connection
throws
QpidException;
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- *
- * @param exceptionListner The execptionListener
- */
- public void setExceptionListener(ExceptionListener exceptionListner);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
deleted file mode 100644
index 5f7bbe7cf2..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient.api;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
- * informs the connection's ExceptionListener
- */
-public interface ExceptionListener
-{
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- *
- * @param exception The exception comming from the communication layer.
- * @see org.apache.qpid.nclient.api.Connection
- */
- public void onException(QpidException exception);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java
deleted file mode 100644
index 4aca6ea203..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient.api;
-
-
-import org.apache.qpid.nclient.FieldTable;
-import org.apache.qpidity.QpidException;
-
-import java.nio.ByteBuffer;
-
-/**
- * A message is sent and received by resources. It is composed of a set of header and a payload.
- */
-public interface Message
-{
- /**
- * Get this message auto-allocated messageID.
- *
- * @return This message ID.
- */
- public long getMessageID();
-
- /**
- * Set this message headers
- * <p> Previous headers are reset.
- *
- * @param headers The message headers as a field table.
- * @see FieldTable
- */
- public void setHeaders(FieldTable headers);
-
- /**
- * Access to this message headers.
- *
- * @return This message headers as a field table.
- */
- public FieldTable getHeaders();
-
- /**
- * Set this message payload.
- *
- * @param buffer This message payload.
- */
- public void setBody(ByteBuffer buffer);
-
- /**
- * Access this message body.
- *
- * @return The payload of this message.
- */
- public ByteBuffer getBody();
-
- /**
- * Acknowledge the receipt of this message.
- * <p>The message must have been previously acquired either by receiving it in
- * pre-acquire mode or by explicitly acquiring it.
- *
- * @throws QpidException If the acknowledgement of the message fails due to some error.
- * @throws IllegalStateException If this messages is not acquired.
- */
- public void acknowledge() throws QpidException, IllegalStateException;
-
- /**
- * Acknowledge the receipt of an acquired messages which IDs are within
- * the interval [this.messageID, message.messageID]
- *
- * @param message The last message to be acknowledged.
- * @throws QpidException If the acknowledgement of this set of messages fails due to some error.
- * @throws IllegalStateException If some messages are not acquired.
- */
- public void acknowledge(Message message) throws QpidException, IllegalStateException;
-
- /**
- * Reject a previously acquired message.
- * <p> A rejected message will not be delivered to any receiver
- * and may be either discarded or moved to the broker dead letter queue.
- *
- * @throws QpidException If this message cannot be rejected dus to some error
- * @throws IllegalStateException If this message is not acquired.
- */
- public void reject() throws QpidException, IllegalStateException;
-
- /**
- * Try to acquire this message hence releasing it form the queue. This means that once acknowledged,
- * this message will not be delivered to any other receiver.
- * <p> As this message may have been consumed by another receiver, message acquisition can fail.
- * The outcome of the acquisition is returned as a Boolean.
- *
- * @return True if the message is successfully acquired, False otherwise.
- * @throws QpidException If this message cannot be acquired dus to some error
- * @throws IllegalStateException If this message has already been acquired.
- */
- public boolean acquire() throws QpidException, IllegalStateException;
-
- /**
- * Give up responsibility for processing this message.
- *
- * @throws QpidException If this message cannot be released dus to some error.
- * @throws IllegalStateException If this message has already been acknowledged.
- */
- public void release() throws QpidException, IllegalStateException;
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
index 5b844bb6c2..5fcf0d9e0d 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
@@ -18,14 +18,20 @@
*/
package org.apache.qpid.nclient.api;
+import org.apache.qpidity.api.Message;
+
/**
* MessageListeners are used to asynchronously receive messages.
*/
public interface MessageListener
{
/**
- * Deliver a message to the listener.
- *
+ * <p>Deliver a message to the listener.
+ * You will be notified when the whole message is received
+ * However, underneath the message might be streamed off disk
+ * or network buffers.
+ * </p>
+ *
* @param message The message delivered to the listner.
*/
public void onMessage(Message message);
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
index 3d58e551cd..80477dc0a0 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
@@ -22,11 +22,12 @@ import java.util.Set;
import org.apache.qpidity.Option;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
/**
* Used to receive messages from a queue
*/
-public interface MessageReceiver extends Resource
+public interface MessageReceiver
{
/**
* Get this receiver options.
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java
deleted file mode 100644
index 212b0dca80..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient.api;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * A Resource is associated with a session and can be independently closed.
- */
-public interface Resource
-{
-
- /**
- * Close this resource.
- * <p> Any blocking receive must return null.
- * <p> For asynchronous receiver, this operation blocks until the message listener
- * finishes processing the current message,
- *
- * @throws QpidException If the session fails to close this resource due to some error
- */
- public void close() throws
- QpidException;
-
- /**
- * Get this resource session.
- *
- * @return This resource's session.
- */
- public Session getSession();
-
- /**
- * Get the queue name to which this resource is tied.
- *
- * @return The queue name of this resource.
- */
- public String getQueueNAme();
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
index 954ed1dbd1..6249c91fdb 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
@@ -24,16 +24,21 @@ import java.util.UUID;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.Header;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.api.StreamingMessageListener;
/**
- * A session is associated with a connection.
- * <p> When created a Session is not attached with an underlying channel. Unsuspended a Session is
- * equivalent to attaching a communication channel that can be used to communicate with the broker.
+ * <p>A session is associated with a connection.
+ * When created a Session is not attached with an underlying channel.
+ * Session is single threaded </p>
*/
public interface Session
{
- //--- Session housekeeping methods
+ //------------------------------------------------------
+ // Session housekeeping methods
+ //------------------------------------------------------
+
/**
* Close this session and any associated resources.
*
@@ -69,38 +74,118 @@ public interface Session
*/
public void sessionResume(UUID sessionId) throws QpidException;
- /**
- * -------------------------------------
- * Messaging methods
- * -------------------------------------
- */
-
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Producer
+ //------------------------------------------------------
+
/**
- * Transfer the given message.
+ * Transfer the given message.
+ * This is a convinience method
*
- * @param queueName The queue this sender is sending messages.
- * @return A sender for queue queueName
- * @throws QpidException If the session fails to create the sended due to some error
+ * @param destination The exchange the message being sent.
+ * @return msg The Message to be sent
+ * @throws QpidException If the session fails to send the message due to some error
*/
public void messageTransfer(String destination,Message msg)throws QpidException;
- public void messageTransfer(Option... options)throws QpidException;
-
/**
* Transfer the given message.
+ * <p> Following are the valid options for messageTransfer
+ * <ul>
+ * <li> CONFIRM
+ * <li> PRE_ACCQUIRE
+ * </ul>
+ * </p>
+ *
+ * <p> In the absence of a particular option, the defaul value is:
+ * <ul>
+ * <li> CONFIRM = false
+ * <li> NO-ACCQUIRE
+ * </ul>
+ * </p>
*
- * @param
- * @throws QpidException If the session fails to create the sended due to some error
+ * @param destination The exchange the message being sent.
+ * @return options set of options
+ * @throws QpidException If the session fails to send the message due to some error
+ */
+ public void messageTransfer(String destination,Option... options)throws QpidException;
+
+ /**
+ * Add the following headers to content bearing frame
+ *
+ * @param Header Either DeliveryProperties or ApplicationProperties
+ * @throws QpidException If the session fails to execute the method due to some error
*/
public void messageHeaders(Header ... headers)throws QpidException;
-
- public void messageBody(byte[] src)throws QpidException;
-
- public void messageClose()throws QpidException;
+
+ /**
+ * Add the following byte array to the content.
+ * This method is useful when streaming large messages
+ *
+ * @param src data to be added or streamed
+ * @throws QpidException If the session fails to execute the method due to some error
+ */
+ public void data(byte[] src)throws QpidException;
+
+ /**
+ * Signals the end of data for the message. *
+ * This method is useful when streaming large messages
+ *
+ * @throws QpidException If the session fails to execute the method due to some error
+ */
+ public void endData()throws QpidException;
/**
- * Create a message receiver for receiving messages from queue queueName.
- * <p> Following are the valid options for createReceive
+ * Acknowledge the receipt of this message.
+ * <p>The message must have been previously acquired either by receiving it in
+ * pre-acquire mode or by explicitly acquiring it.
+ *
+ * @throws QpidException If the acknowledgement of the message fails due to some error.
+ * @throws IllegalStateException If this messages is not acquired.
+ */
+ public void messageAcknowledge() throws QpidException;
+
+ /**
+ * Reject a previously acquired message.
+ * <p> A rejected message will not be delivered to any receiver
+ * and may be either discarded or moved to the broker dead letter queue.
+ *
+ * @throws QpidException If this message cannot be rejected dus to some error
+ * @throws IllegalStateException If this message is not acquired.
+ */
+ public void messageReject() throws QpidException;
+
+ /**
+ * Try to acquire this message hence releasing it form the queue. This means that once acknowledged,
+ * this message will not be delivered to any other receiver.
+ * <p> As this message may have been consumed by another receiver, message acquisition can fail.
+ * The outcome of the acquisition is returned as a Boolean.
+ *
+ * @return True if the message is successfully acquired, False otherwise.
+ * @throws QpidException If this message cannot be acquired dus to some error
+ * @throws IllegalStateException If this message has already been acquired.
+ */
+ public boolean messageAcquire() throws QpidException;
+
+ /**
+ * Give up responsibility for processing this message.
+ *
+ * @throws QpidException If this message cannot be released dus to some error.
+ * @throws IllegalStateException If this message has already been acknowledged.
+ */
+ public void messageRelease() throws QpidException;
+
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Consumer
+ //------------------------------------------------------
+
+ /**
+ * Create a message receiver for receiving messages from queue queueName.
+ * <p> Following are the valid options for messageSubscribe
* <ul>
* <li> NO_LOCAL
* <li> EXCLUSIVE
@@ -108,7 +193,7 @@ public interface Session
* <li> CONFIRM
* </ul>
* </p>
- * <p/>
+ *
* <p> In the absence of a particular option, the defaul value is:
* <ul>
* <li> NO_LOCAL = false
@@ -116,22 +201,44 @@ public interface Session
* <li> PRE-ACCQUIRE
* <li> CONFIRM = false
* </ul>
- * </p>
+ * </p>
*
- * @param queueName The queue this receiver is receiving messages from.
+ * @param queue The queue this receiver is receiving messages from.
+ * @param destination The destination for the subscriber ,a.k.a the delivery tag.
* @param options Set of Options.
- * @return A receiver for queue queueName.
* @throws QpidException If the session fails to create the receiver due to some error.
* @see Option
*/
- public MessageReceiver createReceiver(String queueName, Option... options) throws QpidException;
- //Todo: Do we need to define more specific exceptions like queue name not valid?
+ public void messageSubscribe(String queue, String destination, Map<String,?> filter, Option ... _options) throws QpidException;
+
/**
- * -------------------------------------
- * Transaction methods
- * -------------------------------------
+ * Cancels a subscription
+ *
+ * @param destination The destination for the subscriber used at subscription
+ */
+ public void messageCancel(String destination) throws QpidException;
+
+ /**
+ * We currently allow one listerner per destination
+ *
+ * @param destination
+ * @param listener
*/
+ public void addMessageListener(String destination,StreamingMessageListener listener);
+
+ /**
+ * We currently allow one listerner per destination
+ *
+ * @param destination
+ * @param listener
+ */
+ public void addMessageListener(String destination,MessageListener listener);
+
+
+ // -----------------------------------------------
+ // Transaction methods
+ // ----------------------------------------------
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
@@ -149,21 +256,11 @@ public interface Session
*/
public void txRollback() throws QpidException, IllegalStateException;
- /**
- * Set this session as transacted.
- * <p> This operation is irreversible.
- *
- * @throws QpidException If the session fails to be transacted due to some error.
- * @throws IllegalStateException If this session is already transacted.
- */
- public void setTransacted() throws QpidException, IllegalStateException;
-
- /**
- * -------------------------------------
- * Queue methods
- * -------------------------------------
- */
-
+
+ //---------------------------------------------
+ // Queue methods
+ //---------------------------------------------
+
/**
* Declare a queue with the given queueName
* <p> Following are the valid options for declareQueue
@@ -241,6 +338,11 @@ public interface Session
*/
public void queueDelete(String queueName, Option... options) throws QpidException;
+
+ // --------------------------------------
+ // exhcange methods
+ // --------------------------------------
+
/**
* Declare an exchange.
* <p> Following are the valid options for createReceive
@@ -263,8 +365,7 @@ public interface Session
*/
public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
Map<String, ?> arguments, Option... options) throws QpidException;
- //Todo: Do we need to define more specific exceptions like exchange already exist?
-
+
/**
* Delete an exchange.
* <p> Following are the valid options for createReceive