From debb32550a5bc94c673397809443965611d38283 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 6 Aug 2007 22:31:07 +0000 Subject: fixed compilation errors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563312 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpidity/Connection.java | 78 ---- .../main/java/org/apache/qpidity/DtxSession.java | 138 ------ .../java/org/apache/qpidity/ExceptionListener.java | 37 -- .../main/java/org/apache/qpidity/FieldTable.java | 26 -- .../java/org/apache/qpidity/MessageListener.java | 34 -- .../org/apache/qpidity/MessagePartListener.java | 55 --- .../src/main/java/org/apache/qpidity/Session.java | 489 --------------------- .../java/org/apache/qpidity/client/Connection.java | 78 ++++ .../java/org/apache/qpidity/client/DtxSession.java | 141 ++++++ .../apache/qpidity/client/ExceptionListener.java | 37 ++ .../org/apache/qpidity/client/MessageListener.java | 34 ++ .../apache/qpidity/client/MessagePartListener.java | 55 +++ .../java/org/apache/qpidity/client/Session.java | 489 +++++++++++++++++++++ .../org/apache/qpidity/impl/ClientSession.java | 4 +- .../apache/qpidity/impl/ClientSessionDelegate.java | 2 +- .../qpidity/impl/MessagePartListenerAdapter.java | 4 +- .../org/apache/qpidity/jms/ConnectionImpl.java | 25 +- .../apache/qpidity/jms/MessageConsumerImpl.java | 24 +- .../apache/qpidity/jms/QpidBrowserListener.java | 2 +- .../apache/qpidity/jms/QpidMessageListener.java | 2 +- .../org/apache/qpidity/jms/QueueBrowserImpl.java | 10 +- .../org/apache/qpidity/jms/QueueSessionImpl.java | 6 +- .../java/org/apache/qpidity/jms/SessionImpl.java | 9 +- .../org/apache/qpidity/jms/TopicSessionImpl.java | 6 +- .../java/org/apache/qpidity/jms/XASessionImpl.java | 4 +- 25 files changed, 895 insertions(+), 894 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpidity/Connection.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/DtxSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/ExceptionListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/FieldTable.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/MessageListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/MessagePartListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/Session.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/Connection.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/DtxSession.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/ExceptionListener.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/MessageListener.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/Session.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpidity/Connection.java b/java/client/src/main/java/org/apache/qpidity/Connection.java deleted file mode 100644 index cb56ee954c..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/Connection.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - - -import java.net.URL; - -import org.apache.qpidity.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection with the broker identified by the provided URL. - * - * @param url The URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker. - */ - public void connect(URL url) throws QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() throws QpidException; - - - /** - * Create a session for this connection. - *

The retuned session is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. - * @return A Newly created (suspended) session. - * @throws QpidException If the connection fails to create a session due to some internal error. - */ - public Session createSession(int expiryInSeconds) throws QpidException; - - /** - * Create a DtxSession for this connection. - *

A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - *

The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. - * @return A Newly created (suspended) DtxSession. - * @throws QpidException If the connection fails to create a DtxSession due to some internal error. - */ - public DtxSession createDTXSession(int expiryInSeconds) throws QpidException; - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exceptionListner The execptionListener - */ - public void setExceptionListener(ExceptionListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpidity/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/DtxSession.java deleted file mode 100644 index 127fe4d1a7..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/DtxSession.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -import javax.transaction.xa.Xid; - -/** - * This session�s resources are control under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * This method is called when messages should be produced and consumed on behalf a transaction - * branch identified by xid. - * possible options are: - *

- * - * @param xid Specifies the xid of the transaction branch to be started. - * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @throws QpidException If the session fails to start due to some error - */ - public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException; - - /** - * This method is called when the work done on behalf a transaction branch finishes or needs to - * be suspended. - * possible options are: - * - * - * @param xid Specifies the xid of the transaction branch to be ended. - * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @throws QpidException If the session fails to end due to some error - */ - public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException; - - /** - * Commit the work done on behalf a transaction branch. This method commits the work associated - * with xid. Any produced messages are made available and any consumed messages are discarded. - * possible option is: - * - * - * @param xid Specifies the xid of the transaction branch to be committed. - * @param options Available option is: {@link Option#ONE_PHASE} - * @throws QpidException If the session fails to commit due to some error - */ - public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException; - - /** - * This method is called to forget about a heuristically completed transaction branch. - * - * @param xid Specifies the xid of the transaction branch to be forgotten. - * @throws QpidException If the session fails to forget due to some error - */ - public void dtxCoordinationForget(Xid xid) throws QpidException; - - /** - * This method obtains the current transaction timeout value in seconds. If set-timeout was not - * used prior to invoking this method, the return value is the default timeout; otherwise, the - * value used in the previous set-timeout call is returned. - * - * @param xid Specifies the xid of the transaction branch for getting the timeout. - * @return The current transaction timeout value in seconds. - * @throws QpidException If the session fails to get the timeout due to some error - */ - public long dtxCoordinationGetTimeout(Xid xid) throws QpidException; - - /** - * This method prepares for commitment any message produced or consumed on behalf of xid. - * - * @param xid Specifies the xid of the transaction branch that can be prepared. - * @return The status of the prepare operation: can be one of those: - * xa-ok: Normal execution. - *

- * xa-rdonly: The transaction branch was read-only and has been committed. - *

- * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified - * reason. - *

- * xa-rbtimeout: The work represented by this transaction branch took too long. - * @throws QpidException If the session fails to prepare due to some error - */ - public short dtxCoordinationPrepare(Xid xid) throws QpidException; - - /** - * This method is called to obtain a list of transaction branches that are in a prepared or - * heuristically completed state. - * - * @return a array of xids to be recovered. - * @throws QpidException If the session fails to recover due to some error - */ - public Xid[] dtxCoordinationRecover() throws QpidException; - - /** - * This method rolls back the work associated with xid. Any produced messages are discarded and - * any consumed messages are re-enqueued. - * - * @param xid Specifies the xid of the transaction branch that can be rolled back. - * @throws QpidException If the session fails to rollback due to some error - */ - public void dtxCoordinationRollback(Xid xid) throws QpidException; - - /** - * Sets the specified transaction branch timeout value in seconds. - * - * @param xid Specifies the xid of the transaction branch for setting the timeout. - * @param timeout The transaction timeout value in seconds. - * @throws QpidException If the session fails to set the timeout due to some error - */ - public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException; -} diff --git a/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java b/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java deleted file mode 100644 index e3ca8989ef..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -import org.apache.qpidity.QpidException; - -/** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - */ -public interface ExceptionListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * - * @param exception The exception comming from the communication layer. - * @see Connection - */ - public void onException(QpidException exception); -} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpidity/FieldTable.java b/java/client/src/main/java/org/apache/qpidity/FieldTable.java deleted file mode 100644 index f752bb3373..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/FieldTable.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -/** - * - */ -public interface FieldTable -{ -} diff --git a/java/client/src/main/java/org/apache/qpidity/MessageListener.java b/java/client/src/main/java/org/apache/qpidity/MessageListener.java deleted file mode 100644 index 5a1a526f0b..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/MessageListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -import org.apache.qpidity.api.Message; - -/** - *A message listener - */ -public interface MessageListener -{ - /** - * Process an incoming message. - * - * @param message The incoming message. - */ - public void onMessage(Message message); -} diff --git a/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java deleted file mode 100644 index b370efff17..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java +++ /dev/null @@ -1,55 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -import org.apache.qpidity.Header; - -/** - * Assembles message parts. - *

The sequence of event for transferring a message is as follows: - *

- * This is up to the implementation to assembled the message when the different parts - * are transferred. - */ -public interface MessagePartListener -{ - /** - * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} - * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. - * - * @param headers Either DeliveryProperties or ApplicationProperties - */ - public void messageHeaders(Header... headers); - - /** - * Add the following byte array to the content of the message being received - * - * @param data Data to be added or streamed. - */ - public void addData(byte[] data); - - /** - * Indicates that the message has been fully received. - */ - public void messageReceived(); - -} diff --git a/java/client/src/main/java/org/apache/qpidity/Session.java b/java/client/src/main/java/org/apache/qpidity/Session.java deleted file mode 100644 index 6f283cf203..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/Session.java +++ /dev/null @@ -1,489 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity; - -import java.util.Map; - -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; -import org.apache.qpidity.Header; -import org.apache.qpidity.Range; -import org.apache.qpidity.api.Message; - -/** - *

A session is associated with a connection. - * When created a Session is not attached with an underlying channel. - * Session is single threaded

- */ -public interface Session -{ - public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; - public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; - public static final short CONFIRM_MODE_REQUIRED = 1; - public static final short CONFIRM_MODE_NOT_REQUIRED = 0; - public static final short MESSAGE_FLOW_MODE_CREDIT = 0; - public static final short MESSAGE_FLOW_MODE_WINDOW = 1; - public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; - public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - /** - * Close this session and any associated resources. - * - * @throws QpidException If the communication layer fails to close this session or if an internal error happens - * when closing this session resources. . - */ - public void close() throws QpidException; - - /** - * Suspend this session resulting in interrupting the traffic with the broker. - *

The session timer will start to tick in suspend. - *

When a session is suspend any operation of this session and of the associated resources are unavailable. - * - * @throws QpidException If the communication layer fails to suspend this session - */ - public void suspend() throws QpidException; - - /** - * This will resume an existing session - *

Upon resume the session is attached with an underlying channel - * hence making operation on this session available. - * - * @throws QpidException If the communication layer fails to execute this properly - */ - public void resume() throws QpidException; - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - /** - * Transfer the given message to a specified exchange. - * - * @param confirmMode

- * @param acquireMode - * @param exchange The exchange the message is being sent. - * @param msg The Message to be sent - * @throws QpidException If the session fails to send the message due to some error - */ - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException; - - /** - * Declare the beginning of a message transfer operation. This operation must - * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}. - * The transfer is ended by endData. - *

This way of transferring messages is useful when streaming large messages - *

In the interval [messageTransfer endData] any attempt to call a method other than - * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} - * will result in an exception being thrown. - * - * @param confirmMode

- * @param acquireMode - * @param exchange The exchange the message is being sent. - * @throws QpidException If the session fails to send the message due to some error. - */ - public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException; - - /** - * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} - * or to the message being sent. - * - * @param headers Either DeliveryProperties or ApplicationProperties - * @throws QpidException If the session fails to execute the method due to some error - * @see org.apache.qpidity.DeliveryProperties - */ - public void addMessageHeaders(Header... headers) throws QpidException; - - /** - * Add the following byte array to the content of the message being sent. - * - * @param data Data to be added. - * @param off Offset from which to start reading data - * @param len Number of bytes to be read - * @throws QpidException If the session fails to execute the method due to some error - */ - public void addData(byte[] data, int off, int len) throws QpidException; - - /** - * Signals the end of data for the message. - * - * @throws QpidException If the session fails to execute the method due to some error - */ - public void endData() throws QpidException; - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - *

The destination is bound to a queue and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and may not be handled). - *

- *

Following are the valid options - *

- *

In the absence of a particular option, defaul values are: - *

- * - * @param queue The queue this receiver is receiving messages from. - * @param destination The destination for the subscriber ,a.k.a the delivery tag. - * @param confirmMode - * @param acquireMode - * @param listener The listener for this destination. When big message are transfered then - * it is recommended to use a {@link MessagePartListener}. - * @param options Set of Options. - * @param filter A set of filters for the subscription. The syntax and semantics of these filters depends - * on the providers implementation. - * @throws QpidException If the session fails to create the receiver due to some error. - */ - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map filter, Option... options) - throws QpidException; - - /** - * This method cancels a consumer. This does not affect already delivered messages, but it does - * mean the server will not send any more messages for that destination. The client may receive an - * arbitrary number of messages in between sending the cancel method and receiving the - * notification of completion of the cancel command. - * - * @param destination The destination for the subscriber used at subscription - * @throws QpidException If cancelling the subscription fails due to some error. - */ - public void messageCancel(String destination) throws QpidException; - - /** - * Associate a message part listener with a destination. - * We currently allow one listerner per destination this means - * that the previous message listener is replaced. This is done gracefully i.e. the message - * listener is replaced once it return from the processing of a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - *

- * With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - *

- * Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode

- * @throws QpidException If setting the flow mode fails due to some error. - */ - public void messageFlowMode(String destination, short mode) throws QpidException; - - - /** - * This method controls the flow of message data to a given destination. It is used by the - * recipient of messages to dynamically match the incoming rate of message flow to its - * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" - * number of the specified unit to the available credit balance for the specified destination. - * A value of 0 indicates an infinite amount of credit. This disables any limit for - * the given unit until the credit balance is zeroed with {@link Session#messageStop} - * or {@link Session#messageFlush}. - * - * @param destination The destination to set the flow. - * @param unit Specifies the unit of credit balance. - *

- * One of:

- * @param value Number of credits, a value of 0 indicates an infinite amount of credit. - * @throws QpidException If setting the flow fails due to some error. - */ - public void messageFlow(String destination, short unit, long value) throws QpidException; - - /** - * Forces the broker to exhaust its credit supply. - *

The broker's credit will always be zero when - * this method completes. This method does not complete until all the message transfers occur. - *

This method returns true if messages have been flushed - * (i.e. the queue was not empty and the credit greater then zero). - * It returns false if the queue was empty. - * - * @param destination The destination to call flush on. - * @return True is messages were flushed, false otherwise. - * @throws QpidException If flushing fails due to some error. - */ - public boolean messageFlush(String destination) throws QpidException; - - /** - * On receipt of this method, the brokers MUST set his credit to zero for the given - * destination. This obeys the generic semantics of command completion, i.e. when confirmation - * is issued credit MUST be zero and no further messages will be sent until such a time as - * further credit is received. - * - * @param destination The destination to stop. - * @throws QpidException If stopping fails due to some error. - */ - public void messageStop(String destination) throws QpidException; - - /** - * Acknowledge the receipt of ranges of messages. - *

Message must have been previously acquired either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param range Range of acknowledged messages. - * @throws QpidException If the acknowledgement of the messages fails due to some error. - */ - public void messageAcknowledge(Range... range) throws QpidException; - - /** - * Reject ranges of acquired messages. - *

A rejected message will not be delivered to any receiver - * and may be either discarded or moved to the broker dead letter queue. - * - * @param range Range of rejected messages. - * @throws QpidException If those messages cannot be rejected dus to some error - */ - public void messageReject(Range... range) throws QpidException; - - /** - * Try to acquire ranges of messages hence releasing them form the queue. - * This means that once acknowledged, a message will not be delivered to any other receiver. - *

As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - *

This method should only be called on non-acquired messages. - * - * @param range Ranges of messages to be acquired. - * @return Ranges of explicitly acquired messages. - * @throws QpidException If this message cannot be acquired dus to some error - */ - public Range[] messageAcquire(Range... range) throws QpidException; - - /** - * Give up responsibility for processing ranges of messages. - *

Released messages are re-enqueued. - * - * @param range Ranges of messages to be released. - * @throws QpidException If this message cannot be released dus to some error. - */ - public void messageRelease(Range... range) throws QpidException; - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - * - * @throws QpidException If selecting this session for local transaction support fails due to some error. - */ - public void txSelect() throws QpidException; - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws QpidException If the session fails to commit due to some error. - * @throws IllegalStateException If this session is not transacted. - */ - public void txCommit() throws QpidException, IllegalStateException; - - /** - * Rollback the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws QpidException If the session fails to rollback due to some error. - * @throws IllegalStateException If this session is not transacted. - */ - public void txRollback() throws QpidException, IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - *

Following are the valid options for declareQueue - *

- *

- *

- *

In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - *

  1. The queue is deleted when it is not empty; - * <
  2. Immediate delivery of a message is requested, but there are no consumers connected to - * the queue.
- * @param arguments Used for backward compatibility - * @param options Set of Options. - * @throws QpidException If the session fails to declare the queue due to some error. - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map arguments, Option... options) - throws QpidException; - - /** - * Bind a queue with an exchange. - * - * @param queueName The queue to be bound. - * @param exchangeName The exchange name. - * @param routingKey The routing key. - * @param arguments Used for backward compatibility - * @throws QpidException If the session fails to bind the queue due to some error. - */ - public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) - throws QpidException; - - /** - * Unbind a queue from an exchange. - * - * @param queueName The queue to be unbound. - * @param exchangeName The exchange name. - * @param routingKey The routing key. - * @param arguments Used for backward compatibility - * @throws QpidException If the session fails to unbind the queue due to some error. - */ - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) - throws QpidException; - - /** - * Purge a queue. i.e. delete all enqueued messages - * - * @param queueName The queue to be purged - * @throws QpidException If the session fails to purge the queue due to some error. - */ - public void queuePurge(String queueName) throws QpidException; - - /** - * Delet a queue. - *

Following are the valid options for createReceive - *

    - *
  • IF_EMPTY - *
  • IF_UNUSE - *
  • NO_WAIT - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param queueName The name of the queue to be deleted - * @param options Set of options - * @throws QpidException If the session fails to delete the queue due to some error. - * @see Option - *

- * Following are the valid options - */ - public void queueDelete(String queueName, Option... options) throws QpidException; - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * Declare an exchange. - *

Following are the valid options for createReceive - *

    - *
  • AUTO_DELETE - *
  • DURABLE - *
  • INTERNAL - *
  • NO_WAIT - *
  • PASSIVE - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

* - * - * @param exchangeName The exchange name. - * @param exchangeClass The fully qualified name of the exchange class. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options. - * @param arguments Used for backward compatibility - * @throws QpidException If the session fails to declare the exchange due to some error. - * @see Option - */ - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, - Map arguments, Option... options) throws QpidException; - - /** - * Delete an exchange. - *

Following are the valid options for createReceive - *

    - *
  • IF_UNUSEDL - *
  • NO_WAIT - *
- *

- *

- *

In the absence of a particular option, the defaul value is false for each option - * Immediately deleted even if it is used by another resources.

- * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. - * @throws QpidException If the session fails to delete the exchange due to some error. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options) throws QpidException; -} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java new file mode 100644 index 0000000000..d680cad3f0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + + +import java.net.URL; + +import org.apache.qpidity.QpidException; + +/** + * This represents a physical connection to a broker. + */ +public interface Connection +{ + /** + * Establish the connection with the broker identified by the provided URL. + * + * @param url The URL of the broker. + * @throws QpidException If the communication layer fails to connect with the broker. + */ + public void connect(URL url) throws QpidException; + + /** + * Close this connection. + * + * @throws QpidException if the communication layer fails to close the connection. + */ + public void close() throws QpidException; + + + /** + * Create a session for this connection. + *

The retuned session is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) session. + * @throws QpidException If the connection fails to create a session due to some internal error. + */ + public Session createSession(int expiryInSeconds) throws QpidException; + + /** + * Create a DtxSession for this connection. + *

A Dtx Session must be used when resources have to be manipulated as + * part of a global transaction. + *

The retuned DtxSession is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) DtxSession. + * @throws QpidException If the connection fails to create a DtxSession due to some internal error. + */ + public DtxSession createDTXSession(int expiryInSeconds) throws QpidException; + + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exceptionListner The execptionListener + */ + public void setExceptionListener(ExceptionListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java new file mode 100644 index 0000000000..a9032d5d44 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + +import javax.transaction.xa.Xid; + +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; + +/** + * This session�s resources are control under the scope of a distributed transaction. + */ +public interface DtxSession extends Session +{ + + /** + * This method is called when messages should be produced and consumed on behalf a transaction + * branch identified by xid. + * possible options are: + *

    + *
  • {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. + *
  • {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. + *
+ * + * @param xid Specifies the xid of the transaction branch to be started. + * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. + * @throws QpidException If the session fails to start due to some error + */ + public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException; + + /** + * This method is called when the work done on behalf a transaction branch finishes or needs to + * be suspended. + * possible options are: + *
    + *
  • {@link Option#FAIL}: indicates that this portion of work has failed; + * otherwise this portion of work has + * completed successfully. + *
  • {@link Option#SUSPEND}: Indicates that the transaction branch is + * temporarily suspended in an incomplete state. + *
+ * + * @param xid Specifies the xid of the transaction branch to be ended. + * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. + * @throws QpidException If the session fails to end due to some error + */ + public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException; + + /** + * Commit the work done on behalf a transaction branch. This method commits the work associated + * with xid. Any produced messages are made available and any consumed messages are discarded. + * possible option is: + *
    + *
  • {@link Option#ONE_PHASE}: When set then one-phase commit optimization is used. + *
+ * + * @param xid Specifies the xid of the transaction branch to be committed. + * @param options Available option is: {@link Option#ONE_PHASE} + * @throws QpidException If the session fails to commit due to some error + */ + public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException; + + /** + * This method is called to forget about a heuristically completed transaction branch. + * + * @param xid Specifies the xid of the transaction branch to be forgotten. + * @throws QpidException If the session fails to forget due to some error + */ + public void dtxCoordinationForget(Xid xid) throws QpidException; + + /** + * This method obtains the current transaction timeout value in seconds. If set-timeout was not + * used prior to invoking this method, the return value is the default timeout; otherwise, the + * value used in the previous set-timeout call is returned. + * + * @param xid Specifies the xid of the transaction branch for getting the timeout. + * @return The current transaction timeout value in seconds. + * @throws QpidException If the session fails to get the timeout due to some error + */ + public long dtxCoordinationGetTimeout(Xid xid) throws QpidException; + + /** + * This method prepares for commitment any message produced or consumed on behalf of xid. + * + * @param xid Specifies the xid of the transaction branch that can be prepared. + * @return The status of the prepare operation: can be one of those: + * xa-ok: Normal execution. + *

+ * xa-rdonly: The transaction branch was read-only and has been committed. + *

+ * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified + * reason. + *

+ * xa-rbtimeout: The work represented by this transaction branch took too long. + * @throws QpidException If the session fails to prepare due to some error + */ + public short dtxCoordinationPrepare(Xid xid) throws QpidException; + + /** + * This method is called to obtain a list of transaction branches that are in a prepared or + * heuristically completed state. + * + * @return a array of xids to be recovered. + * @throws QpidException If the session fails to recover due to some error + */ + public Xid[] dtxCoordinationRecover() throws QpidException; + + /** + * This method rolls back the work associated with xid. Any produced messages are discarded and + * any consumed messages are re-enqueued. + * + * @param xid Specifies the xid of the transaction branch that can be rolled back. + * @throws QpidException If the session fails to rollback due to some error + */ + public void dtxCoordinationRollback(Xid xid) throws QpidException; + + /** + * Sets the specified transaction branch timeout value in seconds. + * + * @param xid Specifies the xid of the transaction branch for setting the timeout. + * @param timeout The transaction timeout value in seconds. + * @throws QpidException If the session fails to set the timeout due to some error + */ + public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ExceptionListener.java b/java/client/src/main/java/org/apache/qpidity/client/ExceptionListener.java new file mode 100644 index 0000000000..e2faede165 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/ExceptionListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + +import org.apache.qpidity.QpidException; + +/** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + */ +public interface ExceptionListener +{ + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exception The exception comming from the communication layer. + * @see Connection + */ + public void onException(QpidException exception); +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessageListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessageListener.java new file mode 100644 index 0000000000..eab9d70f00 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/MessageListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + +import org.apache.qpidity.api.Message; + +/** + *A message listener + */ +public interface MessageListener +{ + /** + * Process an incoming message. + * + * @param message The incoming message. + */ + public void onMessage(Message message); +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java new file mode 100644 index 0000000000..633c77ecac --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + +import org.apache.qpidity.Header; + +/** + * Assembles message parts. + *

The sequence of event for transferring a message is as follows: + *

    + *
  • messageHeaders + *
  • n calls to addData + *
  • messageReceived + *
+ * This is up to the implementation to assembled the message when the different parts + * are transferred. + */ +public interface MessagePartListener +{ + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. + * + * @param headers Either DeliveryProperties or ApplicationProperties + */ + public void messageHeaders(Header... headers); + + /** + * Add the following byte array to the content of the message being received + * + * @param data Data to be added or streamed. + */ + public void addData(byte[] data); + + /** + * Indicates that the message has been fully received. + */ + public void messageReceived(); + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java new file mode 100644 index 0000000000..b2d7c5705e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.client; + +import java.util.Map; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.Header; +import org.apache.qpidity.Range; +import org.apache.qpidity.api.Message; + +/** + *

A session is associated with a connection. + * When created a Session is not attached with an underlying channel. + * Session is single threaded

+ */ +public interface Session +{ + public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; + public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; + public static final short CONFIRM_MODE_REQUIRED = 1; + public static final short CONFIRM_MODE_NOT_REQUIRED = 0; + public static final short MESSAGE_FLOW_MODE_CREDIT = 0; + public static final short MESSAGE_FLOW_MODE_WINDOW = 1; + public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; + public static final short MESSAGE_FLOW_UNIT_BYTE = 1; + + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + /** + * Close this session and any associated resources. + * + * @throws QpidException If the communication layer fails to close this session or if an internal error happens + * when closing this session resources. . + */ + public void close() throws QpidException; + + /** + * Suspend this session resulting in interrupting the traffic with the broker. + *

The session timer will start to tick in suspend. + *

When a session is suspend any operation of this session and of the associated resources are unavailable. + * + * @throws QpidException If the communication layer fails to suspend this session + */ + public void suspend() throws QpidException; + + /** + * This will resume an existing session + *

Upon resume the session is attached with an underlying channel + * hence making operation on this session available. + * + * @throws QpidException If the communication layer fails to execute this properly + */ + public void resume() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** + * Transfer the given message to a specified exchange. + * + * @param confirmMode

    off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + *

    + *

  • on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + *
+ * @param acquireMode
  • no-acquire (0): the message must be explicitly acquired + *

    + *

  • pre-acquire (1): the message is acquired when the transfer starts + *
+ * @param exchange The exchange the message is being sent. + * @param msg The Message to be sent + * @throws QpidException If the session fails to send the message due to some error + */ + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException; + + /** + * Declare the beginning of a message transfer operation. This operation must + * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}. + * The transfer is ended by endData. + *

This way of transferring messages is useful when streaming large messages + *

In the interval [messageTransfer endData] any attempt to call a method other than + * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} + * will result in an exception being thrown. + * + * @param confirmMode

    off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + *

    + *

  • on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + *
+ * @param acquireMode
  • no-acquire (0): the message must be explicitly acquired + *

    + *

  • pre-acquire (1): the message is acquired when the transfer starts + *
+ * @param exchange The exchange the message is being sent. + * @throws QpidException If the session fails to send the message due to some error. + */ + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException; + + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or to the message being sent. + * + * @param headers Either DeliveryProperties or ApplicationProperties + * @throws QpidException If the session fails to execute the method due to some error + * @see org.apache.qpidity.DeliveryProperties + */ + public void addMessageHeaders(Header... headers) throws QpidException; + + /** + * Add the following byte array to the content of the message being sent. + * + * @param data Data to be added. + * @param off Offset from which to start reading data + * @param len Number of bytes to be read + * @throws QpidException If the session fails to execute the method due to some error + */ + public void addData(byte[] data, int off, int len) throws QpidException; + + /** + * Signals the end of data for the message. + * + * @throws QpidException If the session fails to execute the method due to some error + */ + public void endData() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Associate a message listener with a destination. + *

The destination is bound to a queue and messages are filtered based + * on the provider filter map (message filtering is specific to the provider and may not be handled). + *

+ *

Following are the valid options + *

    + *
  • NO_LOCAL + *
  • EXCLUSIVE + *
+ *

In the absence of a particular option, defaul values are: + *

    + *
  • NO_LOCAL = false + *
  • EXCLUSIVE = false + *
+ * + * @param queue The queue this receiver is receiving messages from. + * @param destination The destination for the subscriber ,a.k.a the delivery tag. + * @param confirmMode
    off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + *

    + *

  • on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + *
+ * @param acquireMode
  • no-acquire (0): the message must be explicitly acquired + *

    + *

  • pre-acquire (1): the message is acquired when the transfer starts + *
+ * @param listener The listener for this destination. When big message are transfered then + * it is recommended to use a {@link MessagePartListener}. + * @param options Set of Options. + * @param filter A set of filters for the subscription. The syntax and semantics of these filters depends + * on the providers implementation. + * @throws QpidException If the session fails to create the receiver due to some error. + */ + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map filter, Option... options) + throws QpidException; + + /** + * This method cancels a consumer. This does not affect already delivered messages, but it does + * mean the server will not send any more messages for that destination. The client may receive an + * arbitrary number of messages in between sending the cancel method and receiving the + * notification of completion of the cancel command. + * + * @param destination The destination for the subscriber used at subscription + * @throws QpidException If cancelling the subscription fails due to some error. + */ + public void messageCancel(String destination) throws QpidException; + + /** + * Associate a message part listener with a destination. + * We currently allow one listerner per destination this means + * that the previous message listener is replaced. This is done gracefully i.e. the message + * listener is replaced once it return from the processing of a message. + * + * @param destination The destination the listener is associated with. + * @param listener The new listener for this destination. + */ + public void setMessageListener(String destination, MessagePartListener listener); + + /** + * Sets the mode of flow control used for a given destination. + *

+ * With credit based flow control, the broker continually maintains its current + * credit balance with the recipient. The credit balance consists of two values, a message + * count, and a byte count. Whenever message data is sent, both counts must be decremented. + * If either value reaches zero, the flow of message data must stop. Additional credit is + * received via the {@link Session#messageFlow} method. + *

+ * Window based flow control is identical to credit based flow control, however message + * acknowledgment implicitly grants a single unit of message credit, and the size of the + * message in byte credits for each acknowledged message. + * + * @param destination The destination to set the flow mode on. + * @param mode

  • credit (0): choose credit based flow control + *
  • window (1): choose window based flow control
+ * @throws QpidException If setting the flow mode fails due to some error. + */ + public void messageFlowMode(String destination, short mode) throws QpidException; + + + /** + * This method controls the flow of message data to a given destination. It is used by the + * recipient of messages to dynamically match the incoming rate of message flow to its + * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" + * number of the specified unit to the available credit balance for the specified destination. + * A value of 0 indicates an infinite amount of credit. This disables any limit for + * the given unit until the credit balance is zeroed with {@link Session#messageStop} + * or {@link Session#messageFlush}. + * + * @param destination The destination to set the flow. + * @param unit Specifies the unit of credit balance. + *

+ * One of:

    + *
  • message (0) + *
  • byte (1) + *
+ * @param value Number of credits, a value of 0 indicates an infinite amount of credit. + * @throws QpidException If setting the flow fails due to some error. + */ + public void messageFlow(String destination, short unit, long value) throws QpidException; + + /** + * Forces the broker to exhaust its credit supply. + *

The broker's credit will always be zero when + * this method completes. This method does not complete until all the message transfers occur. + *

This method returns true if messages have been flushed + * (i.e. the queue was not empty and the credit greater then zero). + * It returns false if the queue was empty. + * + * @param destination The destination to call flush on. + * @return True is messages were flushed, false otherwise. + * @throws QpidException If flushing fails due to some error. + */ + public boolean messageFlush(String destination) throws QpidException; + + /** + * On receipt of this method, the brokers MUST set his credit to zero for the given + * destination. This obeys the generic semantics of command completion, i.e. when confirmation + * is issued credit MUST be zero and no further messages will be sent until such a time as + * further credit is received. + * + * @param destination The destination to stop. + * @throws QpidException If stopping fails due to some error. + */ + public void messageStop(String destination) throws QpidException; + + /** + * Acknowledge the receipt of ranges of messages. + *

Message must have been previously acquired either by receiving them in + * pre-acquire mode or by explicitly acquiring them. + * + * @param range Range of acknowledged messages. + * @throws QpidException If the acknowledgement of the messages fails due to some error. + */ + public void messageAcknowledge(Range... range) throws QpidException; + + /** + * Reject ranges of acquired messages. + *

A rejected message will not be delivered to any receiver + * and may be either discarded or moved to the broker dead letter queue. + * + * @param range Range of rejected messages. + * @throws QpidException If those messages cannot be rejected dus to some error + */ + public void messageReject(Range... range) throws QpidException; + + /** + * Try to acquire ranges of messages hence releasing them form the queue. + * This means that once acknowledged, a message will not be delivered to any other receiver. + *

As those messages may have been consumed by another receivers hence, + * message acquisition can fail. + * The outcome of the acquisition is returned as an array of ranges of qcquired messages. + *

This method should only be called on non-acquired messages. + * + * @param range Ranges of messages to be acquired. + * @return Ranges of explicitly acquired messages. + * @throws QpidException If this message cannot be acquired dus to some error + */ + public Range[] messageAcquire(Range... range) throws QpidException; + + /** + * Give up responsibility for processing ranges of messages. + *

Released messages are re-enqueued. + * + * @param range Ranges of messages to be released. + * @throws QpidException If this message cannot be released dus to some error. + */ + public void messageRelease(Range... range) throws QpidException; + + // ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + /** + * Selects the session for local transaction support. + * + * @throws QpidException If selecting this session for local transaction support fails due to some error. + */ + public void txSelect() throws QpidException; + + /** + * Commit the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to commit due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txCommit() throws QpidException, IllegalStateException; + + /** + * Rollback the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to rollback due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txRollback() throws QpidException, IllegalStateException; + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + + /** + * Declare a queue with the given queueName + *

Following are the valid options for declareQueue + *

    + *
  • AUTO_DELETE + *
  • DURABLE + *
  • EXCLUSIVE + *
  • NO_WAIT + *
  • PASSIVE + *
+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option + * + * @param queueName The name of the delcared queue. + * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message + * may be rejected by a queue for the following reasons: + *

  1. The queue is deleted when it is not empty; + * <
  2. Immediate delivery of a message is requested, but there are no consumers connected to + * the queue.
+ * @param arguments Used for backward compatibility + * @param options Set of Options. + * @throws QpidException If the session fails to declare the queue due to some error. + * @see Option + */ + public void queueDeclare(String queueName, String alternateExchange, Map arguments, Option... options) + throws QpidException; + + /** + * Bind a queue with an exchange. + * + * @param queueName The queue to be bound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to bind the queue due to some error. + */ + public void queueBind(String queueName, String exchangeName, String routingKey, Map arguments) + throws QpidException; + + /** + * Unbind a queue from an exchange. + * + * @param queueName The queue to be unbound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to unbind the queue due to some error. + */ + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map arguments) + throws QpidException; + + /** + * Purge a queue. i.e. delete all enqueued messages + * + * @param queueName The queue to be purged + * @throws QpidException If the session fails to purge the queue due to some error. + */ + public void queuePurge(String queueName) throws QpidException; + + /** + * Delet a queue. + *

Following are the valid options for createReceive + *

    + *
  • IF_EMPTY + *
  • IF_UNUSE + *
  • NO_WAIT + *
+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option

+ * + * @param queueName The name of the queue to be deleted + * @param options Set of options + * @throws QpidException If the session fails to delete the queue due to some error. + * @see Option + *

+ * Following are the valid options + */ + public void queueDelete(String queueName, Option... options) throws QpidException; + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + + /** + * Declare an exchange. + *

Following are the valid options for createReceive + *

    + *
  • AUTO_DELETE + *
  • DURABLE + *
  • INTERNAL + *
  • NO_WAIT + *
  • PASSIVE + *
+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option

* + * + * @param exchangeName The exchange name. + * @param exchangeClass The fully qualified name of the exchange class. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to declare the exchange due to some error. + * @see Option + */ + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, + Map arguments, Option... options) throws QpidException; + + /** + * Delete an exchange. + *

Following are the valid options for createReceive + *

    + *
  • IF_UNUSEDL + *
  • NO_WAIT + *
+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option + * Immediately deleted even if it is used by another resources.

+ * + * @param exchangeName The name of exchange to be deleted. + * @param options Set of options. + * @throws QpidException If the session fails to delete the exchange due to some error. + * @see Option + */ + public void exchangeDelete(String exchangeName, Option... options) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index 1baf063ef3..b197cde3bb 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -4,13 +4,13 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpidity.api.Message; -import org.apache.qpidity.MessagePartListener; +import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.*; /** * Implements a Qpid Sesion. */ -public class ClientSession implements org.apache.qpidity.Session +public class ClientSession implements org.apache.qpidity.client.Session { Map messagListeners = new HashMap(); diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java index 00b4a65fee..6a36d694c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -2,7 +2,7 @@ package org.apache.qpidity.impl; import org.apache.qpidity.CommonSessionDelegate; import org.apache.qpidity.ExchangeQueryOk; -import org.apache.qpidity.Session; +import org.apache.qpidity.client.Session; public class ClientSessionDelegate extends CommonSessionDelegate diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java index 30d6710cfe..f05f2c0e76 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java @@ -1,9 +1,9 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.MessagePartListener; -import org.apache.qpidity.MessageListener; import org.apache.qpidity.Header; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.MessagePartListener; public class MessagePartListenerAdapter implements MessagePartListener { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 818b146491..bcd3845230 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -86,7 +86,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * The QpidConeection instance that is mapped with thie JMS connection */ - org.apache.qpidity.Connection _qpidConnection; + org.apache.qpidity.client.Connection _qpidConnection; /** * This is the exception listener for this qpid connection. @@ -370,7 +370,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); + QueueSessionImpl queueSession = null; + try + { + queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // add this session to the list of handled sessions. _sessions.add(queueSession); return queueSession; @@ -403,11 +411,20 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * Session.DUPS_OK_ACKNOWLEDGE. * @return a newly created topic session * @throws JMSException If creating the session fails due to some internal error. + * @throws QpidException */ public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode); + TopicSessionImpl session = null; + try + { + session = new TopicSessionImpl(this, transacted, acknowledgeMode); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } // add the session with this Connection's sessions // important for when the Connection is closed. _sessions.add(session); @@ -460,7 +477,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @return This JMS connection underlying Qpid Connection. */ - protected org.apache.qpidity.Connection getQpidConnection() + protected org.apache.qpidity.client.Connection getQpidConnection() { return _qpidConnection; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 3fe32d4e07..d88a177001 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -19,12 +19,12 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.impl.MessagePartListenerAdapter; -import org.apache.qpidity.MessagePartListener; import org.apache.qpidity.Range; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; @@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), - org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages - _messageSelector != null ? org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE, + _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); if (_messageSelector != null) { @@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), - org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. @@ -178,7 +178,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } // set the flow mode getSession().getQpidSession() - .messageFlowMode(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_MODE_CREDIT); + .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT); } //----- Message consumer API @@ -254,7 +254,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } _messageAsyncrhonouslyReceived = 0; getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, MAX_MESSAGE_TRANSFERRED); } @@ -353,8 +353,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // if this consumer is stopped then this will be call when starting getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - received = getSession().getQpidSession().messageFlush(getMessageActorID()); + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); } if ( received == 0 && timeout < 0) { @@ -424,7 +424,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // there is a synch call waiting for a message to be delivered // so tell the broker to deliver a message getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); } } @@ -488,8 +488,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { getSession().getQpidSession() .messageFlow(getMessageActorID(), - org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - int received = getSession().getQpidSession().messageFlush(getMessageActorID()); + org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + int received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); if ( received == 0 && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java index 112af50190..05b97a1154 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java @@ -19,8 +19,8 @@ package org.apache.qpidity.jms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpidity.MessageListener; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.MessageListener; /** * This listener idspatches messaes to its browser. diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index 35935d894c..a914dda044 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -17,9 +17,9 @@ */ package org.apache.qpidity.jms; -import org.apache.qpidity.MessageListener; import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 29821e1b81..f0a5ca0ea1 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -17,9 +17,9 @@ */ package org.apache.qpidity.jms; +import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.MessagePartListener; import org.apache.qpidity.QpidException; import org.apache.qpidity.impl.MessagePartListenerAdapter; @@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(queue.getQueueName(), getMessageActorID(), - org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, // We do not acquire those messages - org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); } @@ -171,9 +171,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser try { getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, _maxbatchlength); - _batchLength = getSession().getQpidSession().messageFlush(getMessageActorID()); + _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); } catch (QpidException e) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java index 8ba0f7409b..767acafe0d 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java @@ -20,6 +20,8 @@ package org.apache.qpidity.jms; import javax.jms.*; import javax.jms.IllegalStateException; +import org.apache.qpidity.QpidException; + /** * Implementation of javax.jms.QueueSession */ @@ -37,9 +39,9 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession * @throws javax.jms.JMSSecurityException If the user could not be authenticated. * @throws javax.jms.JMSException In case of internal error. */ - protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException { - super(connection, transacted, acknowledgeMode); + super(connection, transacted, acknowledgeMode,false); } //-- Overwritten methods diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 0a75673961..5ab8482635 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -108,7 +108,7 @@ public class SessionImpl implements Session /** * The underlying QpidSession */ - private org.apache.qpidity.Session _qpidSession; + private org.apache.qpidity.client.Session _qpidSession; /** * Indicates whether this session is recovering @@ -132,7 +132,7 @@ public class SessionImpl implements Session * @throws QpidException In case of internal error. */ protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA) - throws QpidException + throws QpidException { _connection = connection; _transacted = transacted; @@ -142,6 +142,8 @@ public class SessionImpl implements Session acknowledgeMode = Session.SESSION_TRANSACTED; } _acknowledgeMode = acknowledgeMode; + /* + // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = _connection.getQpidConnection().createSession(0); // set transacted if required @@ -149,6 +151,7 @@ public class SessionImpl implements Session { _qpidSession.txSelect(); } + */ // init the message dispatcher. initMessageDispatcherThread(); } @@ -1036,7 +1039,7 @@ public class SessionImpl implements Session * * @return The associated Qpid Session. */ - protected org.apache.qpidity.Session getQpidSession() + protected org.apache.qpidity.client.Session getQpidSession() { return _qpidSession; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java index 9aff4f1416..15c9196fb6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java @@ -20,6 +20,8 @@ package org.apache.qpidity.jms; import javax.jms.*; import javax.jms.IllegalStateException; +import org.apache.qpidity.QpidException; + /** * Implements TopicSession */ @@ -37,9 +39,9 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession * @throws javax.jms.JMSSecurityException If the user could not be authenticated. * @throws javax.jms.JMSException In case of internal error. */ - protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException { - super(connection, transacted, acknowledgeMode); + super(connection, transacted, acknowledgeMode,false); } //-- Overwritten methods diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java index d62587267a..a1d5a345c2 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java @@ -18,7 +18,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.DtxSession; +import org.apache.qpidity.client.DtxSession; import javax.jms.XASession; import javax.jms.Session; @@ -111,7 +111,7 @@ public class XASessionImpl extends SessionImpl implements XASession * * @return The associated Qpid Session. */ - protected org.apache.qpidity.DtxSession getQpidSession() + protected org.apache.qpidity.client.DtxSession getQpidSession() { return _qpidDtxSession; } -- cgit v1.2.1