From 6167ee934ff21684a93f43b5efcf47a85f1e4aa2 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 3 Aug 2007 11:34:02 +0000 Subject: Removed "api" from the package name git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562414 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/nclient/Connection.java | 78 ++++ .../java/org/apache/qpid/nclient/DtxSession.java | 37 ++ .../org/apache/qpid/nclient/ExceptionListener.java | 37 ++ .../org/apache/qpid/nclient/MessageListener.java | 38 ++ .../main/java/org/apache/qpid/nclient/Session.java | 392 ++++++++++++++++++++ .../qpid/nclient/StreamingMessageListener.java | 53 +++ .../org/apache/qpid/nclient/api/Connection.java | 86 ----- .../org/apache/qpid/nclient/api/DtxSession.java | 39 -- .../apache/qpid/nclient/api/ExceptionListener.java | 37 -- .../apache/qpid/nclient/api/MessageListener.java | 38 -- .../java/org/apache/qpid/nclient/api/Session.java | 396 --------------------- .../apache/qpid/nclient/impl/ClientSession.java | 145 +++++++- .../nclient/impl/StreamingListenerAdapter.java | 17 +- .../apache/qpid/nclient/jms/ConnectionImpl.java | 5 +- .../org/apache/qpid/nclient/jms/MessageActor.java | 1 - .../qpid/nclient/jms/MessageConsumerImpl.java | 27 +- .../qpid/nclient/jms/MessageListenerWrapper.java | 8 +- .../org/apache/qpid/nclient/jms/SessionImpl.java | 29 +- 18 files changed, 806 insertions(+), 657 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Connection.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Session.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/api/Session.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java new file mode 100644 index 0000000000..08e067dede --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + + +import java.net.URL; + +import org.apache.qpidity.QpidException; + +/** + * This represents a physical connection to a broker. + */ +public interface Connection +{ + /** + * Establish the connection with the broker identified by the provided URL. + * + * @param url The URL of the broker. + * @throws QpidException If the communication layer fails to connect with the broker. + */ + public void connect(URL url) throws QpidException; + + /** + * Close this connection. + * + * @throws QpidException if the communication layer fails to close the connection. + */ + public void close() throws QpidException; + + + /** + * Create a session for this connection. + *

The retuned session is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) session. + * @throws QpidException If the connection fails to create a session due to some internal error. + */ + public Session createSession(int expiryInSeconds) throws QpidException; + + /** + * Create a DtxSession for this connection. + *

A Dtx Session must be used when resources have to be manipulated as + * part of a global transaction. + *

The retuned DtxSession is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) DtxSession. + * @throws QpidException If the connection fails to create a DtxSession due to some internal error. + */ + public DtxSession createDTXSession(int expiryInSeconds) throws QpidException; + + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exceptionListner The execptionListener + */ + public void setExceptionListener(ExceptionListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java new file mode 100644 index 0000000000..61f03a2d40 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpidity.QpidException; + +/** + * This session�s resources are control under the scope of a distributed transaction. + */ +public interface DtxSession extends Session +{ + + /** + * Get the XA resource associated with this session. + * + * @return this session XA resource. + * @throws QpidException If the session fails to retrieve its associated XA resource + * due to some error. + */ + public javax.transaction.xa.XAResource getDTXResource() throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java new file mode 100644 index 0000000000..d59d90fc44 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpidity.QpidException; + +/** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + */ +public interface ExceptionListener +{ + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exception The exception comming from the communication layer. + * @see Connection + */ + public void onException(QpidException exception); +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java new file mode 100644 index 0000000000..93b770a285 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpidity.api.Message; + +/** + * MessageListeners are used to asynchronously receive messages. + */ +public interface MessageListener +{ + /** + *

Transfer a message to the listener. + * You will be notified when the whole message is received + * However, underneath the message might be streamed off disk + * or network buffers. + *

+ * + * @param message The message delivered to the listner. + */ + public void messageTransfer(Message message); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java new file mode 100644 index 0000000000..9a2b5e63bf --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import java.util.Map; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.Header; +import org.apache.qpidity.Range; +import org.apache.qpidity.api.Message; + +/** + *

A session is associated with a connection. + * When created a Session is not attached with an underlying channel. + * Session is single threaded

+ */ +public interface Session +{ + + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + /** + * Close this session and any associated resources. + * + * @throws QpidException If the communication layer fails to close this session or if an internal error happens + * when closing this session resources. . + */ + public void close() throws QpidException; + + /** + * Suspend this session resulting in interrupting the traffic with the broker. + *

The session timer will start to tick in suspend. + *

When a session is suspend any operation of this session and of the associated resources are unavailable. + * + * @throws QpidException If the communication layer fails to suspend this session + */ + public void suspend() throws QpidException; + + /** + * This will resume an existing session + *

Upon resume the session is attached with an underlying channel + * hence making operation on this session available. + * + * @throws QpidException If the communication layer fails to execute this properly + */ + public void resume() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** + * Transfer the given message to a specified exchange. + *

Following are the valid options for messageTransfer + *

+ *

In the absence of a particular option, the defaul value is: + *

+ * + * @param exchange The exchange the message is being sent. + * @param msg The Message to be sent + * @param options A list of valid options + * @throws QpidException If the session fails to send the message due to some error + */ + public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException; + + /** + * Declare the beginning of a message transfer operation. This operation must + * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}. + * The transfer is ended by endData. + *

This way of transferring messages is useful when streaming large messages + *

In the interval [messageTransfer endData] any attempt to call a method other than + * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} + * will result in an exception being thrown. + *

Following are the valid options for messageTransfer + *

+ *

In the absence of a particular option, the defaul value is: + *

+ * + * @param exchange The exchange the message is being sent. + * @param options Set of options. + * @throws QpidException If the session fails to send the message due to some error. + */ + public void messageTransfer(String exchange, Option... options) throws QpidException; + + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent. + * + * @param headers Either DeliveryProperties or ApplicationProperties + * @throws QpidException If the session fails to execute the method due to some error + * @see org.apache.qpidity.DeliveryProperties + * @see org.apache.qpidity.ApplicationProperties + */ + public void addMessageHeaders(Header... headers) throws QpidException; + + /** + * Add the following byte array to the content of the message being sent. + * + * @param data Data to be added. + * @param off Offset from which to start reading data + * @param len Number of bytes to be read + * @throws QpidException If the session fails to execute the method due to some error + */ + public void addData(byte[] data, int off, int len) throws QpidException; + + /** + * Signals the end of data for the message. + * + * @throws QpidException If the session fails to execute the method due to some error + */ + public void endData() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Associate a message listener with a destination. + *

The destination is bound to a queue and messages are filtered based + * on the provider filter map (message filtering is specific to the provider and may not be handled). + *

+ *

Following are the valid options + *

+ *

In the absence of a particular option, defaul values are: + *

+ * + * @param queue The queue this receiver is receiving messages from. + * @param destination The destination for the subscriber ,a.k.a the delivery tag. + * @param listener The listener for this destination. When big message are transfered then + * it is recommended to use a {@link StreamingMessageListener}. + * @param options Set of Options. + * @param filter The filters to apply to consumed messages. + * @throws QpidException If the session fails to create the receiver due to some error. + */ + public void messageSubscribe(String queue, String destination, MessageListener listener, Map filter, + Option... options) throws QpidException; + + /** + * Cancels a subscription with a ginven destination. + * + * @param destination The destination for the subscriber used at subscription + * @throws QpidException If cancelling the subscription fails due to some error. + */ + public void messageCancel(String destination) throws QpidException; + + /** + * Associate a message listener with a destination. + * We currently allow one listerner per destination this means + * that the previous message listener is replaced. This is done gracefully i.e. the message + * listener is replaced once it return from the processing of a message. + * + * @param destination The destination the listener is associated with. + * @param listener The new listener for this destination. When big message are transfered then + * it is recommended to use a {@link StreamingMessageListener}. + */ + public void setMessageListener(String destination, MessageListener listener); + + + /** + * Acknowledge the receipt of ranges of messages. + *

Message must have been previously acquired either by receiving them in + * pre-acquire mode or by explicitly acquiring them. + * + * @param range Range of acknowledged messages. + * @throws QpidException If the acknowledgement of the messages fails due to some error. + */ + public void messageAcknowledge(Range... range) throws QpidException; + + /** + * Reject ranges of acquired messages. + *

A rejected message will not be delivered to any receiver + * and may be either discarded or moved to the broker dead letter queue. + * + * @param range Range of rejected messages. + * @throws QpidException If those messages cannot be rejected dus to some error + */ + public void messageReject(Range... range) throws QpidException; + + /** + * Try to acquire ranges of messages hence releasing them form the queue. + * This means that once acknowledged, a message will not be delivered to any other receiver. + *

As those messages may have been consumed by another receivers hence, + * message acquisition can fail. + * The outcome of the acquisition is returned as an array of ranges of qcquired messages. + *

This method should only be called on non-acquired messages. + * + * @param range Ranges of messages to be acquired. + * @return Ranges of explicitly acquired messages. + * @throws QpidException If this message cannot be acquired dus to some error + */ + public Range[] messageAcquire(Range... range) throws QpidException; + + /** + * Give up responsibility for processing ranges of messages. + *

Released messages are re-enqueued. + * + * @param range Ranges of messages to be released. + * @throws QpidException If this message cannot be released dus to some error. + */ + public void messageRelease(Range... range) throws QpidException; + + // ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + /** + * Selects the session for local transaction support. + * + * @throws QpidException If selecting this session for local transaction support fails due to some error. + */ + public void txSelect() throws QpidException; + + /** + * Commit the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to commit due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txCommit() throws QpidException, IllegalStateException; + + /** + * Rollback the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to rollback due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txRollback() throws QpidException, IllegalStateException; + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + + /** + * Declare a queue with the given queueName + *

Following are the valid options for declareQueue + *

+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option + * + * @param queueName The name of the delcared queue. + * @param alternateExchange Alternate excahnge. + * @param options Set of Options. + * @throws QpidException If the session fails to declare the queue due to some error. + * @see Option + */ + public void queueDeclare(String queueName, String alternateExchange, Map arguments, + Option... options) throws QpidException; + + /** + * Bind a queue with an exchange. + * + * @param queueName The queue to be bound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @throws QpidException If the session fails to bind the queue due to some error. + */ + public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) throws + QpidException; + + /** + * Unbind a queue from an exchange. + * + * @param queueName The queue to be unbound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @throws QpidException If the session fails to unbind the queue due to some error. + */ + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) throws + QpidException; + + /** + * Purge a queue. i.e. delete all enqueued messages + * + * @param queueName The queue to be purged + * @throws QpidException If the session fails to purge the queue due to some error. + */ + public void queuePurge(String queueName) throws QpidException; + + /** + * Delet a queue. + *

Following are the valid options for createReceive + *

+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option

+ * + * @param queueName The name of the queue to be deleted + * @param options Set of options + * @throws QpidException If the session fails to delete the queue due to some error. + * @see Option + *

+ * Following are the valid options + */ + public void queueDelete(String queueName, Option... options) throws QpidException; + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + + /** + * Declare an exchange. + *

Following are the valid options for createReceive + *

+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option

* + * + * @param exchangeName The exchange name. + * @param exchangeClass The fully qualified name of the exchange class. + * @param options Set of options. + * @throws QpidException If the session fails to declare the exchange due to some error. + * @see Option + */ + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, + Map arguments, Option... options) throws QpidException; + + /** + * Delete an exchange. + *

Following are the valid options for createReceive + *

    + *
  • IF_UNUSEDL + *
  • NO_WAIT + *
+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option + * Immediately deleted even if it is used by another resources.

+ * + * @param exchangeName The name of exchange to be deleted. + * @param options Set of options. + * @throws QpidException If the session fails to delete the exchange due to some error. + * @see Option + */ + public void exchangeDelete(String exchangeName, Option... options) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java new file mode 100644 index 0000000000..20a8319409 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java @@ -0,0 +1,53 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpidity.Header; + +/** + *

This message listener is useful if you need to + * know when each message part becomes available + * as opposed to knowing when the whole message arrives.

+ *

+ *

The sequence of event for transferring a message is as follows: + *

    + *
  • n calls to addMessageHeaders (should be usually one or two) + *
  • n calls to addData + *
  • {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(null). + *
+ * This is up to the implementation to assembled the message when the different parts + * are transferred. + */ +public interface StreamingMessageListener extends MessageListener +{ + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. + * + * @param headers Either DeliveryProperties or ApplicationProperties + */ + public void addMessageHeaders(Header... headers); + + /** + * Add the following byte array to the content of the message being received + * + * @param data Data to be added or streamed. + */ + public void addData(byte[] data); + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java deleted file mode 100644 index 6d5f317feb..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - - -import java.net.URL; - -import org.apache.qpidity.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection with the broker identified by the provided URL. - * - * @param url The URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker. - */ - public void connect(URL url) - throws - QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() - throws - QpidException; - - - /** - * Create a session for this connection. - *

The retuned session is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. - * @return A Newly created (suspended) session. - * @throws QpidException If the connection fails to create a session due to some internal error. - */ - public Session createSession(int expiryInSeconds) - throws - QpidException; - - /** - * Create a DtxSession for this connection. - *

A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - *

The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. - * @return A Newly created (suspended) DtxSession. - * @throws QpidException If the connection fails to create a DtxSession due to some internal error. - */ - public DtxSession createDTXSession(int expiryInSeconds) - throws - QpidException; - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exceptionListner The execptionListener - */ - public void setExceptionListener(ExceptionListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java deleted file mode 100644 index ac396b7c79..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * This session�s resources are control under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * Get the XA resource associated with this session. - * - * @return this session XA resource. - * @throws QpidException If the session fails to retrieve its associated XA resource - * due to some error. - */ - public javax.transaction.xa.XAResource getDTXResource() - throws - QpidException; -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java deleted file mode 100644 index 1818dbfd23..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.QpidException; - -/** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - */ -public interface ExceptionListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exception The exception comming from the communication layer. - * @see org.apache.qpid.nclient.api.Connection - */ - public void onException(QpidException exception); -} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java deleted file mode 100644 index 5fcf0d9e0d..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import org.apache.qpidity.api.Message; - -/** - * MessageListeners are used to asynchronously receive messages. - */ -public interface MessageListener -{ - /** - *

Deliver a message to the listener. - * You will be notified when the whole message is received - * However, underneath the message might be streamed off disk - * or network buffers. - *

- * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java deleted file mode 100644 index 07938a5985..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient.api; - -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; -import org.apache.qpidity.Header; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.api.StreamingMessageListener; - -/** - *

A session is associated with a connection. - * When created a Session is not attached with an underlying channel. - * Session is single threaded

- */ -public interface Session -{ - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - - /** - * Close this session and any associated resources. - * - * @throws QpidException If the communication layer fails to close this session or if an internal error happens - * when closing this session resources. . - */ - public void sessionClose() throws QpidException; - - /** - * Suspend this session resulting in interrupting the traffic with the broker. - * An important distinction btw sessionFlow() and this method - * is that the session timer will start to tick in suspend. - * When a session is suspend any operation of this session and of the associated resources are unavailable. - * - * @throws QpidException If the communication layer fails to suspend this session - */ - public void sessionSuspend() throws QpidException; - - - /** - * This will stop the communication flow in the session - * However the session is still considered active and the session timer will not tick. - * This method is used for session level flow control. - * - * @throws QpidException If the communication layer fails to execute the flow method properly - */ - public void sessionFlow(Option... options) throws QpidException; - - /** - * This is used for failover. This will resume an existing session - * - * @throws QpidException If the communication layer fails to execute this properly - */ - public void sessionResume(UUID sessionId) throws QpidException; - - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - - /** - * Transfer the given message. - * This is a convinience method - * - * @param destination The exchange the message being sent. - * @return msg The Message to be sent - * @throws QpidException If the session fails to send the message due to some error - */ - public void messageTransfer(String destination,Message msg,Option... options)throws QpidException; - - /** - * Transfer the given message. - *

Following are the valid options for messageTransfer - *

    - *
  • CONFIRM - *
  • PRE_ACCQUIRE - *
- *

- * - *

In the absence of a particular option, the defaul value is: - *

    - *
  • CONFIRM = false - *
  • NO-ACCQUIRE - *
- *

- * - * @param destination The exchange the message being sent. - * @return options set of options - * @throws QpidException If the session fails to send the message due to some error - */ - public void messageTransfer(String destination,Option... options)throws QpidException; - - /** - * Add the following headers to content bearing frame - * - * @param Header Either DeliveryProperties or ApplicationProperties - * @throws QpidException If the session fails to execute the method due to some error - */ - public void messageHeaders(Header ... headers)throws QpidException; - - /** - * Add the following byte array to the content. - * This method is useful when streaming large messages - * - * @param src data to be added or streamed - * @throws QpidException If the session fails to execute the method due to some error - */ - public void data(byte[] src)throws QpidException; - - /** - * Signals the end of data for the message. * - * This method is useful when streaming large messages - * - * @throws QpidException If the session fails to execute the method due to some error - */ - public void endData()throws QpidException; - - /** - * Acknowledge the receipt of this message. - *

The message must have been previously acquired either by receiving it in - * pre-acquire mode or by explicitly acquiring it. - * - * @throws QpidException If the acknowledgement of the message fails due to some error. - * @throws IllegalStateException If this messages is not acquired. - */ - public void messageAcknowledge() throws QpidException; - - /** - * Reject a previously acquired message. - *

A rejected message will not be delivered to any receiver - * and may be either discarded or moved to the broker dead letter queue. - * - * @throws QpidException If this message cannot be rejected dus to some error - * @throws IllegalStateException If this message is not acquired. - */ - public void messageReject() throws QpidException; - - /** - * Try to acquire this message hence releasing it form the queue. This means that once acknowledged, - * this message will not be delivered to any other receiver. - *

As this message may have been consumed by another receiver, message acquisition can fail. - * The outcome of the acquisition is returned as a Boolean. - * - * @return True if the message is successfully acquired, False otherwise. - * @throws QpidException If this message cannot be acquired dus to some error - * @throws IllegalStateException If this message has already been acquired. - */ - public boolean messageAcquire() throws QpidException; - - /** - * Give up responsibility for processing this message. - * - * @throws QpidException If this message cannot be released dus to some error. - * @throws IllegalStateException If this message has already been acknowledged. - */ - public void messageRelease() throws QpidException; - - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Create a message receiver for receiving messages from queue queueName. - *

Following are the valid options for messageSubscribe - *

    - *
  • NO_LOCAL - *
  • EXCLUSIVE - *
  • NO_ACQUIRE - *
  • CONFIRM - *
- *

- * - *

In the absence of a particular option, the defaul value is: - *

    - *
  • NO_LOCAL = false - *
  • EXCLUSIVE = false - *
  • PRE-ACCQUIRE - *
  • CONFIRM = false - *
- *

- * - * @param queue The queue this receiver is receiving messages from. - * @param destination The destination for the subscriber ,a.k.a the delivery tag. - * @param options Set of Options. - * @throws QpidException If the session fails to create the receiver due to some error. - * @see Option - */ - public void messageSubscribe(String queue, String destination, Map filter, Option ... _options) throws QpidException; - - public void messageSubscribe(String queue, String destination, Map filter,StreamingMessageListener listener,Option ... _options) throws QpidException; - - /** - * Cancels a subscription - * - * @param destination The destination for the subscriber used at subscription - */ - public void messageCancel(String destination) throws QpidException; - - /** - * We currently allow one listerner per destination - * - * @param destination - * @param listener - */ - public void setMessageListener(String destination,StreamingMessageListener listener); - - /** - * We currently allow one listerner per destination - * - * @param destination - * @param listener - */ - public void setMessageListener(String destination,MessageListener listener); - - - // ----------------------------------------------- - // Transaction methods - // ---------------------------------------------- - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws QpidException If the session fails to commit due to some error. - * @throws IllegalStateException If this session is not transacted. - */ - public void txCommit() throws QpidException, IllegalStateException; - - /** - * Rollback the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws QpidException If the session fails to rollback due to some error. - * @throws IllegalStateException If this session is not transacted. - */ - public void txRollback() throws QpidException, IllegalStateException; - - - /** - * Selects the session for transactions - * - * @throws QpidException - */ - public void txSelect() throws QpidException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - *

Following are the valid options for declareQueue - *

    - *
  • AUTO_DELETE - *
  • DURABLE - *
  • EXCLUSIVE - *
  • NO_WAIT - *
  • PASSIVE - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param options Set of Options. - * @throws QpidException If the session fails to declare the queue due to some error. - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map arguments, - Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like queue name already exist? - - /** - * Bind a queue with an exchange. - * - * @param queueName The queue to be bound. - * @param exchangeName The exchange name. - * @param routingKey The routing key. - * @throws QpidException If the session fails to bind the queue due to some error. - */ - public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) throws - QpidException; - //Todo: Do we need to define more specific exceptions like exchange does not exist? - - /** - * Unbind a queue from an exchange. - * - * @param queueName The queue to be unbound. - * @param exchangeName The exchange name. - * @param routingKey The routing key. - * @throws QpidException If the session fails to unbind the queue due to some error. - */ - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) throws - QpidException; - //Todo: Do we need to define more specific exceptions like exchange does not exist? - - /** - * Purge a queue. i.e. delete all enqueued messages - * TODO: Define the exact semantic i.e. are message sent to a dead letter queue? - * - * @param queueName The queue to be purged - * @throws QpidException If the session fails to purge the queue due to some error. - */ - public void queuePurge(String queueName) throws QpidException; - - /** - * Delet a queue. - *

Following are the valid options for createReceive - *

    - *
  • IF_EMPTY - *
  • IF_UNUSE - *
  • NO_WAIT - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param queueName The name of the queue to be deleted - * @param options Set of options - * @throws QpidException If the session fails to delete the queue due to some error. - * @see Option - *

- * Following are the valid options - */ - public void queueDelete(String queueName, Option... options) throws QpidException; - - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * Declare an exchange. - *

Following are the valid options for createReceive - *

    - *
  • AUTO_DELETE - *
  • DURABLE - *
  • INTERNAL - *
  • NO_WAIT - *
  • PASSIVE - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

* - * - * @param exchangeName The exchange name. - * @param exchangeClass The fully qualified name of the exchange class. - * @param options Set of options. - * @throws QpidException If the session fails to declare the exchange due to some error. - * @see Option - */ - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, - Map arguments, Option... options) throws QpidException; - - /** - * Delete an exchange. - *

Following are the valid options for createReceive - *

    - *
  • IF_UNUSEDL - *
  • NO_WAIT - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option - * Immediately deleted even if it is used by another resources.

- * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. - * @throws QpidException If the session fails to delete the exchange due to some error. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options) throws QpidException; - //Todo: Do we need to define more specific exceptions like exchange does not exist? -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index c9e5d03948..099f7ed694 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -2,28 +2,149 @@ package org.apache.qpid.nclient.impl; import java.util.Map; -import org.apache.qpidity.api.StreamingMessageListener; -import org.apache.qpid.nclient.api.MessageListener; -import org.apache.qpidity.Option; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Session; +import org.apache.qpidity.api.Message; +import org.apache.qpid.nclient.MessageListener; +import org.apache.qpidity.*; -public class ClientSession extends Session implements org.apache.qpid.nclient.api.Session +/** + * Implements a Qpid Sesion. + */ +public class ClientSession implements org.apache.qpid.nclient.Session { - public void setMessageListener(String destination,MessageListener listener) + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + public void close() throws QpidException { - super.setMessageListener(destination, new StreamingListenerAdapter(listener)); + //To change body of implemented methods use File | Settings | File Templates. } - public void messageSubscribe(String queue, String destination, Map filter, Option... _options) throws QpidException + public void suspend() throws QpidException { - // TODO + //To change body of implemented methods use File | Settings | File Templates. } - public void messageSubscribe(String queue, String destination, Map filter, StreamingMessageListener listener, Option... _options) throws QpidException + public void resume() throws QpidException { - // TODO + //To change body of implemented methods use File | Settings | File Templates. + }//------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageTransfer(String exchange, Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void addMessageHeaders(Header... headers) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void addData(byte[] data, int off, int len) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void endData() throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageSubscribe(String queue, String destination, MessageListener listener, Map filter, + Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageCancel(String destination) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setMessageListener(String destination, MessageListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageAcknowledge(Range... range) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void messageReject(Range... range) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Range[] messageAcquire(Range... range) throws QpidException + { + return new Range[0]; //To change body of implemented methods use File | Settings | File Templates. } + public void messageRelease(Range... range) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + }// ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + public void txSelect() throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void txCommit() throws QpidException, IllegalStateException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void txRollback() throws QpidException, IllegalStateException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void queueDeclare(String queueName, String alternateExchange, Map arguments, + Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) throws + QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) throws + QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void queuePurge(String queueName) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void queueDelete(String queueName, Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, + Map arguments, Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void exchangeDelete(String exchangeName, Option... options) throws QpidException + { + //To change body of implemented methods use File | Settings | File Templates. + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java index b8f04ca0df..ff0ac63540 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java @@ -1,10 +1,10 @@ package org.apache.qpid.nclient.impl; -import org.apache.qpid.nclient.api.MessageListener; +import org.apache.qpid.nclient.MessageListener; +import org.apache.qpid.nclient.StreamingMessageListener; import org.apache.qpidity.Header; import org.apache.qpidity.Option; import org.apache.qpidity.api.Message; -import org.apache.qpidity.api.StreamingMessageListener; public class StreamingListenerAdapter implements StreamingMessageListener { @@ -16,23 +16,18 @@ public class StreamingListenerAdapter implements StreamingMessageListener _adaptee = l; } - public void data(byte[] src) + public void addData(byte[] src) { _currentMsg.appendData(src); } - public void endData() - { - _adaptee.onMessage(_currentMsg); - } - - public void messageHeaders(Header... headers) + public void addMessageHeaders(Header... headers) { //_currentMsg add the headers } - public void messageTransfer(String destination, Option... options) + public void messageTransfer(Message message) { - // _currentMsg create message from factory + _adaptee.messageTransfer(_currentMsg); } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java index da06b09f6d..7cc7659139 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java @@ -25,6 +25,7 @@ import javax.jms.*; import javax.jms.IllegalStateException; import javax.jms.Session; import javax.jms.ExceptionListener; +import javax.jms.Connection; import java.util.Vector; @@ -85,7 +86,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * The QpidConeection instance that is mapped with thie JMS connection */ - org.apache.qpid.nclient.api.Connection _qpidConnection; + org.apache.qpid.nclient.Connection _qpidConnection; /** * This is the exception listener for this qpid connection. @@ -436,7 +437,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @return This JMS connection underlying Qpid Connection. */ - protected org.apache.qpid.nclient.api.Connection getQpidConnection() + protected org.apache.qpid.nclient.Connection getQpidConnection() { return _qpidConnection; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java index e9bdef540c..95a59ecfd3 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java @@ -60,7 +60,6 @@ public abstract class MessageActor protected MessageActor(SessionImpl session, DestinationImpl destination) { - // TODO create the qpidResource _qpidResource = _session = session; _destination = destination; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index a350ceecee..3689eb60b0 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -19,6 +19,7 @@ package org.apache.qpid.nclient.jms; //import org.apache.qpid.nclient.api.MessageReceiver; import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -30,10 +31,6 @@ import javax.jms.Message; */ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { - /** - * The underlying qpid receiver - */ - /* private MessageReceiver _qpidReceiver;*/ /** * This MessageConsumer's messageselector. @@ -55,7 +52,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer /** * A MessageListener set up for this consumer. */ - private MessageListener _messageListener = null; + private MessageListener _messageListener; + + /** + * A warpper around the JSM message listener + */ + private MessageListenerWrapper _messageListenerWrapper; //----- Constructors /** @@ -79,7 +81,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer /*try { // TODO define the relevant options - _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), null); + _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE); + _qpidResource = _qpidReceiver; } catch (QpidException e) { @@ -131,7 +134,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer public void setMessageListener(MessageListener messageListener) throws JMSException { checkNotClosed(); - // TODO: create a message listener wrapper + _messageListener = messageListener; + if( messageListener == null ) + { + + _messageListenerWrapper = null; + } + else + { + _messageListenerWrapper = new MessageListenerWrapper(this); + //TODO _qpidReceiver.setAsynchronous(_messageListenerWrapper); + } } /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java index 358f50506a..4dde127337 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java @@ -17,15 +17,13 @@ */ package org.apache.qpid.nclient.jms; -import org.apache.qpid.nclient.api.MessageListener; +import org.apache.qpid.nclient.MessageListener; import org.apache.qpid.nclient.jms.message.AbstractJMSMessage; import org.apache.qpid.nclient.jms.message.QpidMessage; import org.apache.qpidity.api.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; - /** * This is a wrapper for the JMS message listener */ @@ -57,13 +55,13 @@ public class MessageListenerWrapper implements MessageListener _consumer = consumer; } - //---- org.apache.qpid.nclient.api.MessageListener API + //---- org.apache.qpid.nclient.MessageListener API /** * Deliver a message to the listener. * * @param message The message delivered to the listner. */ - public void onMessage(Message message) + public void messageTransfer(Message message) { try { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index 84ee7723fd..d0c6569197 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -21,7 +21,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.nclient.jms.message.*; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import javax.jms.*; import javax.jms.IllegalStateException; @@ -72,7 +71,7 @@ public class SessionImpl implements Session /** * The underlying QpidSession */ - private org.apache.qpid.nclient.api.Session _qpidSession; + private org.apache.qpid.nclient.Session _qpidSession; /** * Indicates whether this session is recovering @@ -337,7 +336,7 @@ public class SessionImpl implements Session // close the underlaying QpidSession try { - _qpidSession.sessionClose(); + _qpidSession.close(); } catch (org.apache.qpidity.QpidException e) { @@ -463,7 +462,6 @@ public class SessionImpl implements Session */ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - return createConsumer(destination, messageSelector, false); } @@ -664,11 +662,12 @@ public class SessionImpl implements Session /** * Remove a message actor form this session *

This method is called when an actor is independently closed. + * * @param actor The closed actor. */ protected void closeMessageActor(MessageActor actor) { - _messageActors.remove(actor); + _messageActors.remove(actor); } /** @@ -678,15 +677,7 @@ public class SessionImpl implements Session */ protected void start() throws JMSException { - try - { - // TODO: make sure that the correct options are used - _qpidSession.sessionFlow(Option.SUSPEND); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + // TODO: make sure that the correct options are used } /** @@ -696,15 +687,7 @@ public class SessionImpl implements Session */ protected void stop() throws JMSException { - try - { // TODO: make sure that the correct options are used - _qpidSession.sessionFlow(Option.RESUME); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } } /** @@ -818,7 +801,7 @@ public class SessionImpl implements Session * * @return The associated Qpid Session. */ - protected org.apache.qpid.nclient.api.Session getQpidSession() + protected org.apache.qpid.nclient.Session getQpidSession() { return _qpidSession; } -- cgit v1.2.1