summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-08 17:49:47 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-08 17:49:47 +0000
commit457bdffb215d923371a5573939fbc583e7fa4f51 (patch)
tree36c816d8828c866b46cd01b0ece5ebd4d3115e4b /java/client
parent6ce5c06fc48c0216e32ff45624ac00152db62466 (diff)
downloadqpid-python-457bdffb215d923371a5573939fbc583e7fa4f51.tar.gz
changed the API based on the discussions with rafi and arnaud. also fixed the compilation errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563959 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Connection.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/DtxSession.java28
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java119
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java208
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java31
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java70
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java108
10 files changed, 221 insertions, 366 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
index d680cad3f0..3da3a12aa5 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
@@ -51,9 +51,8 @@ public interface Connection
*
* @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
* @return A Newly created (suspended) session.
- * @throws QpidException If the connection fails to create a session due to some internal error.
*/
- public Session createSession(int expiryInSeconds) throws QpidException;
+ public Session createSession(int expiryInSeconds);
/**
* Create a DtxSession for this connection.
@@ -64,9 +63,8 @@ public interface Connection
*
* @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
* @return A Newly created (suspended) DtxSession.
- * @throws QpidException If the connection fails to create a DtxSession due to some internal error.
*/
- public DtxSession createDTXSession(int expiryInSeconds) throws QpidException;
+ public DtxSession createDTXSession(int expiryInSeconds);
/**
* If the communication layer detects a serious problem with a connection, it
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
index a9032d5d44..920e5cea80 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
@@ -21,7 +21,6 @@ package org.apache.qpidity.client;
import javax.transaction.xa.Xid;
import org.apache.qpidity.Option;
-import org.apache.qpidity.QpidException;
/**
* This session�s resources are control under the scope of a distributed transaction.
@@ -40,9 +39,8 @@ public interface DtxSession extends Session
*
* @param xid Specifies the xid of the transaction branch to be started.
* @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
- * @throws QpidException If the session fails to start due to some error
*/
- public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException;
+ public void dtxDemarcationStart(Xid xid, Option... options);
/**
* This method is called when the work done on behalf a transaction branch finishes or needs to
@@ -58,9 +56,8 @@ public interface DtxSession extends Session
*
* @param xid Specifies the xid of the transaction branch to be ended.
* @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
- * @throws QpidException If the session fails to end due to some error
*/
- public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException;
+ public void dtxDemarcationEnd(Xid xid, Option... options);
/**
* Commit the work done on behalf a transaction branch. This method commits the work associated
@@ -72,17 +69,15 @@ public interface DtxSession extends Session
*
* @param xid Specifies the xid of the transaction branch to be committed.
* @param options Available option is: {@link Option#ONE_PHASE}
- * @throws QpidException If the session fails to commit due to some error
*/
- public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException;
+ public void dtxCoordinationCommit(Xid xid, Option... options);
/**
* This method is called to forget about a heuristically completed transaction branch.
*
* @param xid Specifies the xid of the transaction branch to be forgotten.
- * @throws QpidException If the session fails to forget due to some error
*/
- public void dtxCoordinationForget(Xid xid) throws QpidException;
+ public void dtxCoordinationForget(Xid xid);
/**
* This method obtains the current transaction timeout value in seconds. If set-timeout was not
@@ -91,9 +86,8 @@ public interface DtxSession extends Session
*
* @param xid Specifies the xid of the transaction branch for getting the timeout.
* @return The current transaction timeout value in seconds.
- * @throws QpidException If the session fails to get the timeout due to some error
*/
- public long dtxCoordinationGetTimeout(Xid xid) throws QpidException;
+ public long dtxCoordinationGetTimeout(Xid xid);
/**
* This method prepares for commitment any message produced or consumed on behalf of xid.
@@ -108,34 +102,30 @@ public interface DtxSession extends Session
* reason.
* <p/>
* xa-rbtimeout: The work represented by this transaction branch took too long.
- * @throws QpidException If the session fails to prepare due to some error
*/
- public short dtxCoordinationPrepare(Xid xid) throws QpidException;
+ public short dtxCoordinationPrepare(Xid xid);
/**
* This method is called to obtain a list of transaction branches that are in a prepared or
* heuristically completed state.
*
* @return a array of xids to be recovered.
- * @throws QpidException If the session fails to recover due to some error
*/
- public Xid[] dtxCoordinationRecover() throws QpidException;
+ public Xid[] dtxCoordinationRecover();
/**
* This method rolls back the work associated with xid. Any produced messages are discarded and
* any consumed messages are re-enqueued.
*
* @param xid Specifies the xid of the transaction branch that can be rolled back.
- * @throws QpidException If the session fails to rollback due to some error
*/
- public void dtxCoordinationRollback(Xid xid) throws QpidException;
+ public void dtxCoordinationRollback(Xid xid);
/**
* Sets the specified transaction branch timeout value in seconds.
*
* @param xid Specifies the xid of the transaction branch for setting the timeout.
* @param timeout The transaction timeout value in seconds.
- * @throws QpidException If the session fails to set the timeout due to some error
*/
- public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException;
+ public void dtxCoordinationSetTimeout(Xid xid, long timeout);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index ca77a4959b..0178ef6d3b 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -21,7 +21,6 @@ package org.apache.qpidity.client;
import java.util.Map;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -45,31 +44,31 @@ public interface Session
//------------------------------------------------------
// Session housekeeping methods
//------------------------------------------------------
+
+ /**
+ * Sync method will block until all outstanding commands
+ * are executed.
+ */
+ public void sync();
+
/**
* Close this session and any associated resources.
- *
- * @throws org.apache.qpidity.QpidException If the communication layer fails to close this session or if an internal error happens
- * when closing this session resources. .
*/
- public void close() throws QpidException;
+ public void close();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
- *
- * @throws QpidException If the communication layer fails to suspend this session
*/
- public void suspend() throws QpidException;
+ public void suspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
- *
- * @throws QpidException If the communication layer fails to execute this properly
*/
- public void resume() throws QpidException;
+ public void resume();
//------------------------------------------------------
// Messaging methods
@@ -92,9 +91,8 @@ public interface Session
* </ul>
* @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
- * @throws QpidException If the session fails to send the message due to some error
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException;
+ public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -118,19 +116,17 @@ public interface Session
* <li> pre-acquire (1): the message is acquired when the transfer starts
* </ul>
* @param exchange The exchange the message is being sent.
- * @throws QpidException If the session fails to send the message due to some error.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException;
+ public void messageTransfer(String exchange, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
- * @throws QpidException If the session fails to execute the method due to some error
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers) throws QpidException;
+ public void addMessageHeaders(Header... headers);
/**
* Add the following byte array to the content of the message being sent.
@@ -138,16 +134,13 @@ public interface Session
* @param data Data to be added.
* @param off Offset from which to start reading data
* @param len Number of bytes to be read
- * @throws QpidException If the session fails to execute the method due to some error
*/
- public void addData(byte[] data, int off, int len) throws QpidException;
+ public void addData(byte[] data, int off, int len);
/**
* Signals the end of data for the message.
- *
- * @throws QpidException If the session fails to execute the method due to some error
*/
- public void endData() throws QpidException;
+ public void endData();
//------------------------------------------------------
// Messaging methods
@@ -189,11 +182,9 @@ public interface Session
* @param options Set of Options.
* @param filter A set of filters for the subscription. The syntax and semantics of these filters depends
* on the providers implementation.
- * @throws QpidException If the session fails to create the receiver due to some error.
*/
public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, ?> filter, Option... options)
- throws QpidException;
+ MessagePartListener listener, Map<String, ?> filter, Option... options);
/**
* This method cancels a consumer. This does not affect already delivered messages, but it does
@@ -202,9 +193,8 @@ public interface Session
* notification of completion of the cancel command.
*
* @param destination The destination for the subscriber used at subscription
- * @throws QpidException If cancelling the subscription fails due to some error.
*/
- public void messageCancel(String destination) throws QpidException;
+ public void messageCancel(String destination);
/**
* Associate a message part listener with a destination.
@@ -235,9 +225,8 @@ public interface Session
* @param destination The destination to set the flow mode on.
* @param mode <ul> <li>credit (0): choose credit based flow control
* <li> window (1): choose window based flow control</ul>
- * @throws QpidException If setting the flow mode fails due to some error.
*/
- public void messageFlowMode(String destination, short mode) throws QpidException;
+ public void messageFlowMode(String destination, short mode);
/**
@@ -257,9 +246,8 @@ public interface Session
* <li> byte (1)
* </ul>
* @param value Number of credits, a value of 0 indicates an infinite amount of credit.
- * @throws QpidException If setting the flow fails due to some error.
*/
- public void messageFlow(String destination, short unit, long value) throws QpidException;
+ public void messageFlow(String destination, short unit, long value);
/**
* Forces the broker to exhaust its credit supply.
@@ -268,10 +256,10 @@ public interface Session
* <p> This method returns the number of flushed messages.
*
* @param destination The destination to call flush on.
- * @return The number of flushed messages
- * @throws QpidException If flushing fails due to some error.
*/
- public int messageFlush(String destination) throws QpidException;
+ public void messageFlush(String destination);
+
+ public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -280,9 +268,8 @@ public interface Session
* further credit is received.
*
* @param destination The destination to stop.
- * @throws QpidException If stopping fails due to some error.
*/
- public void messageStop(String destination) throws QpidException;
+ public void messageStop(String destination);
/**
* Acknowledge the receipt of ranges of messages.
@@ -290,9 +277,8 @@ public interface Session
* pre-acquire mode or by explicitly acquiring them.
*
* @param ranges Range of acknowledged messages.
- * @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(RangeSet ranges) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges);
/**
* Reject ranges of acquired messages.
@@ -300,9 +286,8 @@ public interface Session
* and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
- * @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(RangeSet ranges) throws QpidException;
+ public void messageReject(RangeSet ranges);
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -313,45 +298,39 @@ public interface Session
* <p> This method should only be called on non-acquired messages.
*
* @param range Ranges of messages to be acquired.
- * @return Ranges of explicitly acquired messages.
- * @throws QpidException If this message cannot be acquired dus to some error
*/
- public RangeSet messageAcquire(RangeSet range) throws QpidException;
+ public void messageAcquire(RangeSet ranges);
+
+ public RangeSet getAccquiredMessages();
+
/**
* Give up responsibility for processing ranges of messages.
* <p> Released messages are re-enqueued.
*
* @param range Ranges of messages to be released.
- * @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(RangeSet range) throws QpidException;
+ public void messageRelease(RangeSet ranges);
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
/**
* Selects the session for local transaction support.
- *
- * @throws QpidException If selecting this session for local transaction support fails due to some error.
*/
- public void txSelect() throws QpidException;
+ public void txSelect();
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
- *
- * @throws QpidException If the session fails to commit due to some error.
* @throws IllegalStateException If this session is not transacted.
*/
- public void txCommit() throws QpidException, IllegalStateException;
+ public void txCommit() throws IllegalStateException;
/**
* Rollback the receipt and the delivery of all messages exchanged by this session resources.
- *
- * @throws QpidException If the session fails to rollback due to some error.
* @throws IllegalStateException If this session is not transacted.
*/
- public void txRollback() throws QpidException, IllegalStateException;
+ public void txRollback() throws IllegalStateException;
//---------------------------------------------
// Queue methods
@@ -379,11 +358,9 @@ public interface Session
* the queue. </ol>
* @param arguments Used for backward compatibility
* @param options Set of Options.
- * @throws QpidException If the session fails to declare the queue due to some error.
* @see Option
*/
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
- throws QpidException;
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
/**
* Bind a queue with an exchange.
@@ -392,10 +369,8 @@ public interface Session
* @param exchangeName The exchange name.
* @param routingKey The routing key.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to bind the queue due to some error.
*/
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException;
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
/**
* Unbind a queue from an exchange.
@@ -404,18 +379,15 @@ public interface Session
* @param exchangeName The exchange name.
* @param routingKey The routing key.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to unbind the queue due to some error.
*/
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException;
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
/**
* Purge a queue. i.e. delete all enqueued messages
*
* @param queueName The queue to be purged
- * @throws QpidException If the session fails to purge the queue due to some error.
*/
- public void queuePurge(String queueName) throws QpidException;
+ public void queuePurge(String queueName);
/**
* Delet a queue.
@@ -431,12 +403,11 @@ public interface Session
*
* @param queueName The name of the queue to be deleted
* @param options Set of options
- * @throws QpidException If the session fails to delete the queue due to some error.
* @see Option
* <p/>
* Following are the valid options
*/
- public void queueDelete(String queueName, Option... options) throws QpidException;
+ public void queueDelete(String queueName, Option... options);
// --------------------------------------
// exhcange methods
@@ -462,11 +433,10 @@ public interface Session
* the message will be sent.
* @param options Set of options.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to declare the exchange due to some error.
* @see Option
*/
public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
- Map<String, ?> arguments, Option... options) throws QpidException;
+ Map<String, ?> arguments, Option... options);
/**
* Delete an exchange.
@@ -482,8 +452,15 @@ public interface Session
*
* @param exchangeName The name of exchange to be deleted.
* @param options Set of options.
- * @throws QpidException If the session fails to delete the exchange due to some error.
* @see Option
*/
- public void exchangeDelete(String exchangeName, Option... options) throws QpidException;
+ public void exchangeDelete(String exchangeName, Option... options);
+
+ /**
+ * If the session receives a sessionClosed with an error code it
+ * informs the session's ExceptionListener
+ *
+ * @param exceptionListner The execptionListener
+ */
+ public void setExceptionListener(ExceptionListener exceptionListner);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
index f6d0cfaefd..a0111a86f4 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
@@ -4,6 +4,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.*;
@@ -15,190 +16,201 @@ public class ClientSession implements org.apache.qpidity.client.Session
Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
-
- //------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
- public void close() throws QpidException
+ public void addData(byte[] data, int off, int len)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void suspend() throws QpidException
+ public void addMessageHeaders(Header... headers)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void resume() throws QpidException
+ public void close()
{
- // TODO
+ // TODO Auto-generated method stub
+
+ }
- }//------------------------------------------------------
- // Messaging methods
- // Producer
- //------------------------------------------------------
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException
+ public void endData()
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException
+ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void addMessageHeaders(Header... headers) throws QpidException
+ public void exchangeDelete(String exchangeName, Option... options)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void addData(byte[] data, int off, int len) throws QpidException
+ public void messageAcknowledge(RangeSet ranges)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void endData() throws QpidException
+ public void messageAcquire(RangeSet ranges)
{
- // TODO
-
+ // TODO Auto-generated method stub
}
- public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, ?> filter, Option... options)
- throws QpidException
+ public void messageCancel(String destination)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void messageCancel(String destination) throws QpidException
+ public void messageFlow(String destination, short unit, long value)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void setMessageListener(String destination, MessagePartListener listener)
+ public void messageFlowMode(String destination, short mode)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void messageFlowMode(String destination, short mode) throws QpidException
+ public void messageFlush(String destination)
{
- // TODO
-
+ // TODO Auto-generated method stub
}
- public void messageFlow(String destination, short unit, long value) throws QpidException
+ public void messageReject(RangeSet ranges)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public int messageFlush(String destination) throws QpidException
+ public void messageRelease(RangeSet ranges)
{
- // TODO
- return 1;
+ // TODO Auto-generated method stub
+
}
- public void messageStop(String destination) throws QpidException
+ public void messageStop(String destination)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void messageAcknowledge(RangeSet ranges) throws QpidException
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void messageReject(RangeSet ranges) throws QpidException
+ public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public RangeSet messageAcquire(RangeSet ranges) throws QpidException
+ public void messageTransfer(String exchange, short confirmMode, short acquireMode)
{
- // TODO
- return null;
+ // TODO Auto-generated method stub
+
}
- public void messageRelease(RangeSet ranges) throws QpidException
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
{
- // TODO
+ // TODO Auto-generated method stub
+
+ }
- }// -----------------------------------------------
- // Local transaction methods
- // ----------------------------------------------
- public void txSelect() throws QpidException
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void txCommit() throws QpidException, IllegalStateException
+ public void queueDelete(String queueName, Option... options)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void txRollback() throws QpidException, IllegalStateException
+ public void queuePurge(String queueName)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
- throws QpidException
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException
+ public void resume()
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException
+ public void setExceptionListener(ExceptionListener exceptionListner)
{
- // TODO
-
+ // TODO Auto-generated method stub
+
}
- public void queuePurge(String queueName) throws QpidException
+ public void setMessageListener(String destination, MessagePartListener listener)
{
- // TODO
+ // TODO Auto-generated method stub
+
+ }
+ public void suspend()
+ {
+ // TODO Auto-generated method stub
+
}
- public void queueDelete(String queueName, Option... options) throws QpidException
+ public void sync()
{
- // TODO
+ // TODO Auto-generated method stub
+
+ }
+ public void txCommit() throws IllegalStateException
+ {
+ // TODO Auto-generated method stub
+
}
- public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
- Map<String, ?> arguments, Option... options) throws QpidException
+ public void txRollback() throws IllegalStateException
{
- // TODO
+ // TODO Auto-generated method stub
+
+ }
+ public void txSelect()
+ {
+ // TODO Auto-generated method stub
+
}
- public void exchangeDelete(String exchangeName, Option... options) throws QpidException
+ public RangeSet getAccquiredMessages()
{
- // TODO
+ // TODO Auto-generated method stub
+ return null;
+ }
+ public int getNoOfUnAckedMessages()
+ {
+ // TODO Auto-generated method stub
+ return 0;
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
index bcd3845230..3b6153c487 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
@@ -23,9 +23,6 @@ import org.apache.qpidity.QpidException;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.Session;
-import javax.jms.ExceptionListener;
-import javax.jms.Connection;
import java.util.Vector;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
index 239a70f8d0..6363108d45 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java
@@ -85,15 +85,7 @@ public abstract class MessageActor
if (!_isClosed)
{
closeMessageActor();
- try
- {
- // cancel this destination
- getSession().getQpidSession().messageCancel(getMessageActorID());
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ getSession().getQpidSession().messageCancel(getMessageActorID());
//todo: We need to unset the qpid message listener
// notify the session that this message actor is closing
_session.closeMessageActor(this);
@@ -155,6 +147,7 @@ public abstract class MessageActor
{
if (!_isClosed)
{
+ getSession().getQpidSession().messageCancel(getMessageActorID());
_isClosed = true;
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 57e63dd30c..b9922de296 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -591,7 +591,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges);
+ getSession().getQpidSession().messageAcquire(ranges);
+ RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
result = true; // todo acquired.iterator().next().getLower() == message.getMessageID();
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index f0a5ca0ea1..ad85f33aea 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -17,19 +17,19 @@
*/
package org.apache.qpidity.jms;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.impl.MessagePartListenerAdapter;
-import javax.jms.QueueBrowser;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Message;
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
-
/**
* Implementation of the JMS QueueBrowser interface
*/
@@ -168,17 +168,10 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
_received = 0;
// request messages
int received = 0;
- try
- {
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- _maxbatchlength);
- _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ _maxbatchlength);
+ _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
}
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 9c9f5d9663..d5342fb464 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -26,9 +26,6 @@ import org.apache.qpidity.Range;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.Session;
-import javax.jms.Message;
-import javax.jms.MessageListener;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
@@ -309,14 +306,7 @@ public class SessionImpl implements Session
throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted");
}
// commit the underlying Qpid Session
- try
- {
- _qpidSession.txCommit();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ _qpidSession.txCommit();
}
/**
@@ -335,14 +325,7 @@ public class SessionImpl implements Session
throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted");
}
// rollback the underlying Qpid Session
- try
- {
- _qpidSession.txRollback();
- }
- catch (org.apache.qpidity.QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ _qpidSession.txRollback();
}
/**
@@ -401,14 +384,7 @@ public class SessionImpl implements Session
_incomingAsynchronousMessages.notifyAll();
}
// close the underlaying QpidSession
- try
- {
- _qpidSession.close();
- }
- catch (org.apache.qpidity.QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ _qpidSession.close();
}
}
@@ -446,14 +422,7 @@ public class SessionImpl implements Session
RangeSet ranges = new RangeSet();
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- try
- {
- getQpidSession().messageRelease(ranges);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ getQpidSession().messageRelease(ranges);
// TODO We can be a little bit cleverer and build a set of ranges
}
}
@@ -566,15 +535,17 @@ public class SessionImpl implements Session
{
checkNotClosed();
checkDestination(destination);
- MessageConsumerImpl consumer;
+ MessageConsumerImpl consumer = null;
try
{
consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
}
catch (Exception e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ // TODO Auto-generated catch block
+ e.printStackTrace();
}
+
// register this actor with the session
_messageActors.put(consumer.getMessageActorID(), consumer);
return consumer;
@@ -599,14 +570,15 @@ public class SessionImpl implements Session
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
- Queue result;
+ Queue result = null;
try
{
result = new QueueImpl(this, queueName);
}
catch (QpidException e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ // TODO Auto-generated catch block
+ e.printStackTrace();
}
return result;
}
@@ -987,14 +959,7 @@ public class SessionImpl implements Session
RangeSet ranges = new RangeSet();
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- try
- {
- getQpidSession().messageAcknowledge(ranges);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ getQpidSession().messageAcknowledge(ranges);
}
//tobedone: Implement DUPS OK heuristic
}
@@ -1022,15 +987,8 @@ public class SessionImpl implements Session
// acknowledge this message
RangeSet ranges = new RangeSet();
// TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
- try
- {
- getQpidSession().messageAcknowledge(ranges);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ // ranges.add(message.getMessageID());
+ getQpidSession().messageAcknowledge(ranges);
// TODO We can be a little bit cleverer and build a set of ranges
}
//empty the list of unack messages
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
index 0ec8c4a423..efcb4430c8 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
@@ -72,14 +72,7 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("commit ", xid);
}
- try
- {
- _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION);
}
/**
@@ -104,17 +97,10 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("end ", xid);
}
- try
- {
- _xid = null;
- _xaSession.getQpidSession()
- .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
- flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ xid = null;
+ _xaSession.getQpidSession()
+ .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
}
/**
@@ -130,15 +116,7 @@ public class XAResourceImpl implements XAResource
{
_logger.debug("forget ", xid);
}
- try
- {
- _xaSession.getQpidSession()
- .dtxCoordinationForget(xid);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ _xaSession.getQpidSession().dtxCoordinationForget(xid);
}
/**
@@ -155,14 +133,7 @@ public class XAResourceImpl implements XAResource
int result = 0;
if (_xid != null)
{
- try
- {
- result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
}
return result;
}
@@ -198,15 +169,9 @@ public class XAResourceImpl implements XAResource
_logger.debug("prepare ", xid);
}
int result;
- try
- {
- result = _xaSession.getQpidSession()
- .dtxCoordinationPrepare(xid);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ result = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(xid);
+
if (result == XAException.XA_RDONLY)
{
throw new XAException(XAException.XA_RDONLY);
@@ -232,16 +197,9 @@ public class XAResourceImpl implements XAResource
*/
public Xid[] recover(int flag) throws XAException
{
- try
- {
- // the flag is ignored
- return _xaSession.getQpidSession()
- .dtxCoordinationRecover();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+// the flag is ignored
+ return _xaSession.getQpidSession()
+ .dtxCoordinationRecover();
}
/**
@@ -252,16 +210,9 @@ public class XAResourceImpl implements XAResource
*/
public void rollback(Xid xid) throws XAException
{
- try
- {
- // the flag is ignored
- _xaSession.getQpidSession()
- .dtxCoordinationRollback(xid);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+// the flag is ignored
+ _xaSession.getQpidSession()
+ .dtxCoordinationRollback(xid);
}
/**
@@ -279,17 +230,9 @@ public class XAResourceImpl implements XAResource
boolean result = false;
if (_xid != null)
{
- try
- {
- // the flag is ignored
- _xaSession.getQpidSession()
- .dtxCoordinationSetTimeout(_xid, timeout);
- result = true;
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ _xaSession.getQpidSession()
+ .dtxCoordinationSetTimeout(_xid, timeout);
+ result = true;
}
return result;
}
@@ -315,15 +258,8 @@ public class XAResourceImpl implements XAResource
_logger.debug("start ", xid);
}
_xid = xid;
- try
- {
- _xaSession.getQpidSession()
- .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToXAException(e);
- }
+ _xaSession.getQpidSession()
+ .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
}
}