diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-01 22:18:14 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-01 22:18:14 +0000 |
| commit | f2bdaf2894c3b505af0540218245432a79d1e4a0 (patch) | |
| tree | 6916d3484508cbfd27088b9c78f72044b6745868 /java | |
| parent | 61e3d35383e4e9387b59ab3a67ef0fa76204c6da (diff) | |
| download | qpid-python-f2bdaf2894c3b505af0540218245432a79d1e4a0.tar.gz | |
I made serveral changes in the API in this commit
ExceptionListener is removed - The API will throw QpidExceptions that contains AMQP error codes
Resource is removed - After the changes this is irrelevant
MessageReceiver is removed - All message methods are now in session to bring the API more in line with AMQP
Message is removed and a refactored interface is added to the common package that can be used by both broker and client.
Session - Has many modifications to bring it inline with the generated methods in the Invoker.
It also directly exposes the message methods
For receiving it will take in StreamingMessageListeners or MessageListeners
Connection - remove the exception listener
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 161 insertions, 268 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java index 6d5f317feb..a2609bc6ff 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java @@ -76,11 +76,4 @@ public interface Connection throws QpidException; - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exceptionListner The execptionListener - */ - public void setExceptionListener(ExceptionListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java deleted file mode 100644 index 5f7bbe7cf2..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it - * informs the connection's ExceptionListener - */ -public interface ExceptionListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exception The exception comming from the communication layer. - * @see org.apache.qpid.nclient.api.Connection - */ - public void onException(QpidException exception); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java deleted file mode 100644 index 4aca6ea203..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - - -import org.apache.qpid.nclient.FieldTable; -import org.apache.qpidity.QpidException; - -import java.nio.ByteBuffer; - -/** - * A message is sent and received by resources. It is composed of a set of header and a payload. - */ -public interface Message -{ - /** - * Get this message auto-allocated messageID. - * - * @return This message ID. - */ - public long getMessageID(); - - /** - * Set this message headers - * <p> Previous headers are reset. - * - * @param headers The message headers as a field table. - * @see FieldTable - */ - public void setHeaders(FieldTable headers); - - /** - * Access to this message headers. - * - * @return This message headers as a field table. - */ - public FieldTable getHeaders(); - - /** - * Set this message payload. - * - * @param buffer This message payload. - */ - public void setBody(ByteBuffer buffer); - - /** - * Access this message body. - * - * @return The payload of this message. - */ - public ByteBuffer getBody(); - - /** - * Acknowledge the receipt of this message. - * <p>The message must have been previously acquired either by receiving it in - * pre-acquire mode or by explicitly acquiring it. - * - * @throws QpidException If the acknowledgement of the message fails due to some error. - * @throws IllegalStateException If this messages is not acquired. - */ - public void acknowledge() throws QpidException, IllegalStateException; - - /** - * Acknowledge the receipt of an acquired messages which IDs are within - * the interval [this.messageID, message.messageID] - * - * @param message The last message to be acknowledged. - * @throws QpidException If the acknowledgement of this set of messages fails due to some error. - * @throws IllegalStateException If some messages are not acquired. - */ - public void acknowledge(Message message) throws QpidException, IllegalStateException; - - /** - * Reject a previously acquired message. - * <p> A rejected message will not be delivered to any receiver - * and may be either discarded or moved to the broker dead letter queue. - * - * @throws QpidException If this message cannot be rejected dus to some error - * @throws IllegalStateException If this message is not acquired. - */ - public void reject() throws QpidException, IllegalStateException; - - /** - * Try to acquire this message hence releasing it form the queue. This means that once acknowledged, - * this message will not be delivered to any other receiver. - * <p> As this message may have been consumed by another receiver, message acquisition can fail. - * The outcome of the acquisition is returned as a Boolean. - * - * @return True if the message is successfully acquired, False otherwise. - * @throws QpidException If this message cannot be acquired dus to some error - * @throws IllegalStateException If this message has already been acquired. - */ - public boolean acquire() throws QpidException, IllegalStateException; - - /** - * Give up responsibility for processing this message. - * - * @throws QpidException If this message cannot be released dus to some error. - * @throws IllegalStateException If this message has already been acknowledged. - */ - public void release() throws QpidException, IllegalStateException; -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java index 5b844bb6c2..5fcf0d9e0d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java @@ -18,14 +18,20 @@ */ package org.apache.qpid.nclient.api; +import org.apache.qpidity.api.Message; + /** * MessageListeners are used to asynchronously receive messages. */ public interface MessageListener { /** - * Deliver a message to the listener. - * + * <p>Deliver a message to the listener. + * You will be notified when the whole message is received + * However, underneath the message might be streamed off disk + * or network buffers. + * </p> + * * @param message The message delivered to the listner. */ public void onMessage(Message message); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java index 3d58e551cd..80477dc0a0 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java @@ -22,11 +22,12 @@ import java.util.Set; import org.apache.qpidity.Option; import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; /** * Used to receive messages from a queue */ -public interface MessageReceiver extends Resource +public interface MessageReceiver { /** * Get this receiver options. diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java deleted file mode 100644 index 212b0dca80..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * A Resource is associated with a session and can be independently closed. - */ -public interface Resource -{ - - /** - * Close this resource. - * <p> Any blocking receive must return null. - * <p> For asynchronous receiver, this operation blocks until the message listener - * finishes processing the current message, - * - * @throws QpidException If the session fails to close this resource due to some error - */ - public void close() throws - QpidException; - - /** - * Get this resource session. - * - * @return This resource's session. - */ - public Session getSession(); - - /** - * Get the queue name to which this resource is tied. - * - * @return The queue name of this resource. - */ - public String getQueueNAme(); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java index 954ed1dbd1..6249c91fdb 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java @@ -24,16 +24,21 @@ import java.util.UUID; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.Header; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.api.StreamingMessageListener; /** - * A session is associated with a connection. - * <p> When created a Session is not attached with an underlying channel. Unsuspended a Session is - * equivalent to attaching a communication channel that can be used to communicate with the broker. + * <p>A session is associated with a connection. + * When created a Session is not attached with an underlying channel. + * Session is single threaded </p> */ public interface Session { - //--- Session housekeeping methods + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + /** * Close this session and any associated resources. * @@ -69,38 +74,118 @@ public interface Session */ public void sessionResume(UUID sessionId) throws QpidException; - /** - * ------------------------------------- - * Messaging methods - * ------------------------------------- - */ - + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** - * Transfer the given message. + * Transfer the given message. + * This is a convinience method * - * @param queueName The queue this sender is sending messages. - * @return A sender for queue queueName - * @throws QpidException If the session fails to create the sended due to some error + * @param destination The exchange the message being sent. + * @return msg The Message to be sent + * @throws QpidException If the session fails to send the message due to some error */ public void messageTransfer(String destination,Message msg)throws QpidException; - public void messageTransfer(Option... options)throws QpidException; - /** * Transfer the given message. + * <p> Following are the valid options for messageTransfer + * <ul> + * <li> CONFIRM + * <li> PRE_ACCQUIRE + * </ul> + * </p> + * + * <p> In the absence of a particular option, the defaul value is: + * <ul> + * <li> CONFIRM = false + * <li> NO-ACCQUIRE + * </ul> + * </p> * - * @param - * @throws QpidException If the session fails to create the sended due to some error + * @param destination The exchange the message being sent. + * @return options set of options + * @throws QpidException If the session fails to send the message due to some error + */ + public void messageTransfer(String destination,Option... options)throws QpidException; + + /** + * Add the following headers to content bearing frame + * + * @param Header Either DeliveryProperties or ApplicationProperties + * @throws QpidException If the session fails to execute the method due to some error */ public void messageHeaders(Header ... headers)throws QpidException; - - public void messageBody(byte[] src)throws QpidException; - - public void messageClose()throws QpidException; + + /** + * Add the following byte array to the content. + * This method is useful when streaming large messages + * + * @param src data to be added or streamed + * @throws QpidException If the session fails to execute the method due to some error + */ + public void data(byte[] src)throws QpidException; + + /** + * Signals the end of data for the message. * + * This method is useful when streaming large messages + * + * @throws QpidException If the session fails to execute the method due to some error + */ + public void endData()throws QpidException; /** - * Create a message receiver for receiving messages from queue queueName. - * <p> Following are the valid options for createReceive + * Acknowledge the receipt of this message. + * <p>The message must have been previously acquired either by receiving it in + * pre-acquire mode or by explicitly acquiring it. + * + * @throws QpidException If the acknowledgement of the message fails due to some error. + * @throws IllegalStateException If this messages is not acquired. + */ + public void messageAcknowledge() throws QpidException; + + /** + * Reject a previously acquired message. + * <p> A rejected message will not be delivered to any receiver + * and may be either discarded or moved to the broker dead letter queue. + * + * @throws QpidException If this message cannot be rejected dus to some error + * @throws IllegalStateException If this message is not acquired. + */ + public void messageReject() throws QpidException; + + /** + * Try to acquire this message hence releasing it form the queue. This means that once acknowledged, + * this message will not be delivered to any other receiver. + * <p> As this message may have been consumed by another receiver, message acquisition can fail. + * The outcome of the acquisition is returned as a Boolean. + * + * @return True if the message is successfully acquired, False otherwise. + * @throws QpidException If this message cannot be acquired dus to some error + * @throws IllegalStateException If this message has already been acquired. + */ + public boolean messageAcquire() throws QpidException; + + /** + * Give up responsibility for processing this message. + * + * @throws QpidException If this message cannot be released dus to some error. + * @throws IllegalStateException If this message has already been acknowledged. + */ + public void messageRelease() throws QpidException; + + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Create a message receiver for receiving messages from queue queueName. + * <p> Following are the valid options for messageSubscribe * <ul> * <li> NO_LOCAL * <li> EXCLUSIVE @@ -108,7 +193,7 @@ public interface Session * <li> CONFIRM * </ul> * </p> - * <p/> + * * <p> In the absence of a particular option, the defaul value is: * <ul> * <li> NO_LOCAL = false @@ -116,22 +201,44 @@ public interface Session * <li> PRE-ACCQUIRE * <li> CONFIRM = false * </ul> - * </p> + * </p> * - * @param queueName The queue this receiver is receiving messages from. + * @param queue The queue this receiver is receiving messages from. + * @param destination The destination for the subscriber ,a.k.a the delivery tag. * @param options Set of Options. - * @return A receiver for queue queueName. * @throws QpidException If the session fails to create the receiver due to some error. * @see Option */ - public MessageReceiver createReceiver(String queueName, Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like queue name not valid? + public void messageSubscribe(String queue, String destination, Map<String,?> filter, Option ... _options) throws QpidException; + /** - * ------------------------------------- - * Transaction methods - * ------------------------------------- + * Cancels a subscription + * + * @param destination The destination for the subscriber used at subscription + */ + public void messageCancel(String destination) throws QpidException; + + /** + * We currently allow one listerner per destination + * + * @param destination + * @param listener */ + public void addMessageListener(String destination,StreamingMessageListener listener); + + /** + * We currently allow one listerner per destination + * + * @param destination + * @param listener + */ + public void addMessageListener(String destination,MessageListener listener); + + + // ----------------------------------------------- + // Transaction methods + // ---------------------------------------------- /** * Commit the receipt and the delivery of all messages exchanged by this session resources. @@ -149,21 +256,11 @@ public interface Session */ public void txRollback() throws QpidException, IllegalStateException; - /** - * Set this session as transacted. - * <p> This operation is irreversible. - * - * @throws QpidException If the session fails to be transacted due to some error. - * @throws IllegalStateException If this session is already transacted. - */ - public void setTransacted() throws QpidException, IllegalStateException; - - /** - * ------------------------------------- - * Queue methods - * ------------------------------------- - */ - + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + /** * Declare a queue with the given queueName * <p> Following are the valid options for declareQueue @@ -241,6 +338,11 @@ public interface Session */ public void queueDelete(String queueName, Option... options) throws QpidException; + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + /** * Declare an exchange. * <p> Following are the valid options for createReceive @@ -263,8 +365,7 @@ public interface Session */ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like exchange already exist? - + /** * Delete an exchange. * <p> Following are the valid options for createReceive |
