diff options
Diffstat (limited to 'java')
7 files changed, 161 insertions, 268 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java index 6d5f317feb..a2609bc6ff 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java @@ -76,11 +76,4 @@ public interface Connection throws QpidException; - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exceptionListner The execptionListener - */ - public void setExceptionListener(ExceptionListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java deleted file mode 100644 index 5f7bbe7cf2..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it - * informs the connection's ExceptionListener - */ -public interface ExceptionListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exception The exception comming from the communication layer. - * @see org.apache.qpid.nclient.api.Connection - */ - public void onException(QpidException exception); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java deleted file mode 100644 index 4aca6ea203..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - - -import org.apache.qpid.nclient.FieldTable; -import org.apache.qpidity.QpidException; - -import java.nio.ByteBuffer; - -/** - * A message is sent and received by resources. It is composed of a set of header and a payload. - */ -public interface Message -{ - /** - * Get this message auto-allocated messageID. - * - * @return This message ID. - */ - public long getMessageID(); - - /** - * Set this message headers - * <p> Previous headers are reset. - * - * @param headers The message headers as a field table. - * @see FieldTable - */ - public void setHeaders(FieldTable headers); - - /** - * Access to this message headers. - * - * @return This message headers as a field table. - */ - public FieldTable getHeaders(); - - /** - * Set this message payload. - * - * @param buffer This message payload. - */ - public void setBody(ByteBuffer buffer); - - /** - * Access this message body. - * - * @return The payload of this message. - */ - public ByteBuffer getBody(); - - /** - * Acknowledge the receipt of this message. - * <p>The message must have been previously acquired either by receiving it in - * pre-acquire mode or by explicitly acquiring it. - * - * @throws QpidException If the acknowledgement of the message fails due to some error. - * @throws IllegalStateException If this messages is not acquired. - */ - public void acknowledge() throws QpidException, IllegalStateException; - - /** - * Acknowledge the receipt of an acquired messages which IDs are within - * the interval [this.messageID, message.messageID] - * - * @param message The last message to be acknowledged. - * @throws QpidException If the acknowledgement of this set of messages fails due to some error. - * @throws IllegalStateException If some messages are not acquired. - */ - public void acknowledge(Message message) throws QpidException, IllegalStateException; - - /** - * Reject a previously acquired message. - * <p> A rejected message will not be delivered to any receiver - * and may be either discarded or moved to the broker dead letter queue. - * - * @throws QpidException If this message cannot be rejected dus to some error - * @throws IllegalStateException If this message is not acquired. - */ - public void reject() throws QpidException, IllegalStateException; - - /** - * Try to acquire this message hence releasing it form the queue. This means that once acknowledged, - * this message will not be delivered to any other receiver. - * <p> As this message may have been consumed by another receiver, message acquisition can fail. - * The outcome of the acquisition is returned as a Boolean. - * - * @return True if the message is successfully acquired, False otherwise. - * @throws QpidException If this message cannot be acquired dus to some error - * @throws IllegalStateException If this message has already been acquired. - */ - public boolean acquire() throws QpidException, IllegalStateException; - - /** - * Give up responsibility for processing this message. - * - * @throws QpidException If this message cannot be released dus to some error. - * @throws IllegalStateException If this message has already been acknowledged. - */ - public void release() throws QpidException, IllegalStateException; -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java index 5b844bb6c2..5fcf0d9e0d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java @@ -18,14 +18,20 @@ */ package org.apache.qpid.nclient.api; +import org.apache.qpidity.api.Message; + /** * MessageListeners are used to asynchronously receive messages. */ public interface MessageListener { /** - * Deliver a message to the listener. - * + * <p>Deliver a message to the listener. + * You will be notified when the whole message is received + * However, underneath the message might be streamed off disk + * or network buffers. + * </p> + * * @param message The message delivered to the listner. */ public void onMessage(Message message); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java index 3d58e551cd..80477dc0a0 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java @@ -22,11 +22,12 @@ import java.util.Set; import org.apache.qpidity.Option; import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; /** * Used to receive messages from a queue */ -public interface MessageReceiver extends Resource +public interface MessageReceiver { /** * Get this receiver options. diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java deleted file mode 100644 index 212b0dca80..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * A Resource is associated with a session and can be independently closed. - */ -public interface Resource -{ - - /** - * Close this resource. - * <p> Any blocking receive must return null. - * <p> For asynchronous receiver, this operation blocks until the message listener - * finishes processing the current message, - * - * @throws QpidException If the session fails to close this resource due to some error - */ - public void close() throws - QpidException; - - /** - * Get this resource session. - * - * @return This resource's session. - */ - public Session getSession(); - - /** - * Get the queue name to which this resource is tied. - * - * @return The queue name of this resource. - */ - public String getQueueNAme(); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java index 954ed1dbd1..6249c91fdb 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java @@ -24,16 +24,21 @@ import java.util.UUID; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.Header; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.api.StreamingMessageListener; /** - * A session is associated with a connection. - * <p> When created a Session is not attached with an underlying channel. Unsuspended a Session is - * equivalent to attaching a communication channel that can be used to communicate with the broker. + * <p>A session is associated with a connection. + * When created a Session is not attached with an underlying channel. + * Session is single threaded </p> */ public interface Session { - //--- Session housekeeping methods + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + /** * Close this session and any associated resources. * @@ -69,38 +74,118 @@ public interface Session */ public void sessionResume(UUID sessionId) throws QpidException; - /** - * ------------------------------------- - * Messaging methods - * ------------------------------------- - */ - + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** - * Transfer the given message. + * Transfer the given message. + * This is a convinience method * - * @param queueName The queue this sender is sending messages. - * @return A sender for queue queueName - * @throws QpidException If the session fails to create the sended due to some error + * @param destination The exchange the message being sent. + * @return msg The Message to be sent + * @throws QpidException If the session fails to send the message due to some error */ public void messageTransfer(String destination,Message msg)throws QpidException; - public void messageTransfer(Option... options)throws QpidException; - /** * Transfer the given message. + * <p> Following are the valid options for messageTransfer + * <ul> + * <li> CONFIRM + * <li> PRE_ACCQUIRE + * </ul> + * </p> + * + * <p> In the absence of a particular option, the defaul value is: + * <ul> + * <li> CONFIRM = false + * <li> NO-ACCQUIRE + * </ul> + * </p> * - * @param - * @throws QpidException If the session fails to create the sended due to some error + * @param destination The exchange the message being sent. + * @return options set of options + * @throws QpidException If the session fails to send the message due to some error + */ + public void messageTransfer(String destination,Option... options)throws QpidException; + + /** + * Add the following headers to content bearing frame + * + * @param Header Either DeliveryProperties or ApplicationProperties + * @throws QpidException If the session fails to execute the method due to some error */ public void messageHeaders(Header ... headers)throws QpidException; - - public void messageBody(byte[] src)throws QpidException; - - public void messageClose()throws QpidException; + + /** + * Add the following byte array to the content. + * This method is useful when streaming large messages + * + * @param src data to be added or streamed + * @throws QpidException If the session fails to execute the method due to some error + */ + public void data(byte[] src)throws QpidException; + + /** + * Signals the end of data for the message. * + * This method is useful when streaming large messages + * + * @throws QpidException If the session fails to execute the method due to some error + */ + public void endData()throws QpidException; /** - * Create a message receiver for receiving messages from queue queueName. - * <p> Following are the valid options for createReceive + * Acknowledge the receipt of this message. + * <p>The message must have been previously acquired either by receiving it in + * pre-acquire mode or by explicitly acquiring it. + * + * @throws QpidException If the acknowledgement of the message fails due to some error. + * @throws IllegalStateException If this messages is not acquired. + */ + public void messageAcknowledge() throws QpidException; + + /** + * Reject a previously acquired message. + * <p> A rejected message will not be delivered to any receiver + * and may be either discarded or moved to the broker dead letter queue. + * + * @throws QpidException If this message cannot be rejected dus to some error + * @throws IllegalStateException If this message is not acquired. + */ + public void messageReject() throws QpidException; + + /** + * Try to acquire this message hence releasing it form the queue. This means that once acknowledged, + * this message will not be delivered to any other receiver. + * <p> As this message may have been consumed by another receiver, message acquisition can fail. + * The outcome of the acquisition is returned as a Boolean. + * + * @return True if the message is successfully acquired, False otherwise. + * @throws QpidException If this message cannot be acquired dus to some error + * @throws IllegalStateException If this message has already been acquired. + */ + public boolean messageAcquire() throws QpidException; + + /** + * Give up responsibility for processing this message. + * + * @throws QpidException If this message cannot be released dus to some error. + * @throws IllegalStateException If this message has already been acknowledged. + */ + public void messageRelease() throws QpidException; + + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Create a message receiver for receiving messages from queue queueName. + * <p> Following are the valid options for messageSubscribe * <ul> * <li> NO_LOCAL * <li> EXCLUSIVE @@ -108,7 +193,7 @@ public interface Session * <li> CONFIRM * </ul> * </p> - * <p/> + * * <p> In the absence of a particular option, the defaul value is: * <ul> * <li> NO_LOCAL = false @@ -116,22 +201,44 @@ public interface Session * <li> PRE-ACCQUIRE * <li> CONFIRM = false * </ul> - * </p> + * </p> * - * @param queueName The queue this receiver is receiving messages from. + * @param queue The queue this receiver is receiving messages from. + * @param destination The destination for the subscriber ,a.k.a the delivery tag. * @param options Set of Options. - * @return A receiver for queue queueName. * @throws QpidException If the session fails to create the receiver due to some error. * @see Option */ - public MessageReceiver createReceiver(String queueName, Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like queue name not valid? + public void messageSubscribe(String queue, String destination, Map<String,?> filter, Option ... _options) throws QpidException; + /** - * ------------------------------------- - * Transaction methods - * ------------------------------------- + * Cancels a subscription + * + * @param destination The destination for the subscriber used at subscription + */ + public void messageCancel(String destination) throws QpidException; + + /** + * We currently allow one listerner per destination + * + * @param destination + * @param listener */ + public void addMessageListener(String destination,StreamingMessageListener listener); + + /** + * We currently allow one listerner per destination + * + * @param destination + * @param listener + */ + public void addMessageListener(String destination,MessageListener listener); + + + // ----------------------------------------------- + // Transaction methods + // ---------------------------------------------- /** * Commit the receipt and the delivery of all messages exchanged by this session resources. @@ -149,21 +256,11 @@ public interface Session */ public void txRollback() throws QpidException, IllegalStateException; - /** - * Set this session as transacted. - * <p> This operation is irreversible. - * - * @throws QpidException If the session fails to be transacted due to some error. - * @throws IllegalStateException If this session is already transacted. - */ - public void setTransacted() throws QpidException, IllegalStateException; - - /** - * ------------------------------------- - * Queue methods - * ------------------------------------- - */ - + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + /** * Declare a queue with the given queueName * <p> Following are the valid options for declareQueue @@ -241,6 +338,11 @@ public interface Session */ public void queueDelete(String queueName, Option... options) throws QpidException; + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + /** * Declare an exchange. * <p> Following are the valid options for createReceive @@ -263,8 +365,7 @@ public interface Session */ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like exchange already exist? - + /** * Delete an exchange. * <p> Following are the valid options for createReceive |
