diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-08 17:49:47 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-08 17:49:47 +0000 |
| commit | 457bdffb215d923371a5573939fbc583e7fa4f51 (patch) | |
| tree | 36c816d8828c866b46cd01b0ece5ebd4d3115e4b /java | |
| parent | 6ce5c06fc48c0216e32ff45624ac00152db62466 (diff) | |
| download | qpid-python-457bdffb215d923371a5573939fbc583e7fa4f51.tar.gz | |
changed the API based on the discussions with rafi and arnaud. also fixed the compilation errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563959 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 226 insertions, 374 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index d680cad3f0..3da3a12aa5 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -51,9 +51,8 @@ public interface Connection * * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) session. - * @throws QpidException If the connection fails to create a session due to some internal error. */ - public Session createSession(int expiryInSeconds) throws QpidException; + public Session createSession(int expiryInSeconds); /** * Create a DtxSession for this connection. @@ -64,9 +63,8 @@ public interface Connection * * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) DtxSession. - * @throws QpidException If the connection fails to create a DtxSession due to some internal error. */ - public DtxSession createDTXSession(int expiryInSeconds) throws QpidException; + public DtxSession createDTXSession(int expiryInSeconds); /** * If the communication layer detects a serious problem with a connection, it diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java index a9032d5d44..920e5cea80 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java @@ -21,7 +21,6 @@ package org.apache.qpidity.client; import javax.transaction.xa.Xid; import org.apache.qpidity.Option; -import org.apache.qpidity.QpidException; /** * This session�s resources are control under the scope of a distributed transaction. @@ -40,9 +39,8 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch to be started. * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @throws QpidException If the session fails to start due to some error */ - public void dtxDemarcationStart(Xid xid, Option... options) throws QpidException; + public void dtxDemarcationStart(Xid xid, Option... options); /** * This method is called when the work done on behalf a transaction branch finishes or needs to @@ -58,9 +56,8 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch to be ended. * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @throws QpidException If the session fails to end due to some error */ - public void dtxDemarcationEnd(Xid xid, Option... options) throws QpidException; + public void dtxDemarcationEnd(Xid xid, Option... options); /** * Commit the work done on behalf a transaction branch. This method commits the work associated @@ -72,17 +69,15 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch to be committed. * @param options Available option is: {@link Option#ONE_PHASE} - * @throws QpidException If the session fails to commit due to some error */ - public void dtxCoordinationCommit(Xid xid, Option... options) throws QpidException; + public void dtxCoordinationCommit(Xid xid, Option... options); /** * This method is called to forget about a heuristically completed transaction branch. * * @param xid Specifies the xid of the transaction branch to be forgotten. - * @throws QpidException If the session fails to forget due to some error */ - public void dtxCoordinationForget(Xid xid) throws QpidException; + public void dtxCoordinationForget(Xid xid); /** * This method obtains the current transaction timeout value in seconds. If set-timeout was not @@ -91,9 +86,8 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch for getting the timeout. * @return The current transaction timeout value in seconds. - * @throws QpidException If the session fails to get the timeout due to some error */ - public long dtxCoordinationGetTimeout(Xid xid) throws QpidException; + public long dtxCoordinationGetTimeout(Xid xid); /** * This method prepares for commitment any message produced or consumed on behalf of xid. @@ -108,34 +102,30 @@ public interface DtxSession extends Session * reason. * <p/> * xa-rbtimeout: The work represented by this transaction branch took too long. - * @throws QpidException If the session fails to prepare due to some error */ - public short dtxCoordinationPrepare(Xid xid) throws QpidException; + public short dtxCoordinationPrepare(Xid xid); /** * This method is called to obtain a list of transaction branches that are in a prepared or * heuristically completed state. * * @return a array of xids to be recovered. - * @throws QpidException If the session fails to recover due to some error */ - public Xid[] dtxCoordinationRecover() throws QpidException; + public Xid[] dtxCoordinationRecover(); /** * This method rolls back the work associated with xid. Any produced messages are discarded and * any consumed messages are re-enqueued. * * @param xid Specifies the xid of the transaction branch that can be rolled back. - * @throws QpidException If the session fails to rollback due to some error */ - public void dtxCoordinationRollback(Xid xid) throws QpidException; + public void dtxCoordinationRollback(Xid xid); /** * Sets the specified transaction branch timeout value in seconds. * * @param xid Specifies the xid of the transaction branch for setting the timeout. * @param timeout The transaction timeout value in seconds. - * @throws QpidException If the session fails to set the timeout due to some error */ - public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws QpidException; + public void dtxCoordinationSetTimeout(Xid xid, long timeout); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index ca77a4959b..0178ef6d3b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -21,7 +21,6 @@ package org.apache.qpidity.client; import java.util.Map;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -45,31 +44,31 @@ public interface Session //------------------------------------------------------
// Session housekeeping methods
//------------------------------------------------------
+
+ /**
+ * Sync method will block until all outstanding commands
+ * are executed.
+ */
+ public void sync();
+
/**
* Close this session and any associated resources.
- *
- * @throws org.apache.qpidity.QpidException If the communication layer fails to close this session or if an internal error happens
- * when closing this session resources. .
*/
- public void close() throws QpidException;
+ public void close();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
- *
- * @throws QpidException If the communication layer fails to suspend this session
*/
- public void suspend() throws QpidException;
+ public void suspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
- *
- * @throws QpidException If the communication layer fails to execute this properly
*/
- public void resume() throws QpidException;
+ public void resume();
//------------------------------------------------------
// Messaging methods
@@ -92,9 +91,8 @@ public interface Session * </ul>
* @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
- * @throws QpidException If the session fails to send the message due to some error
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException;
+ public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -118,19 +116,17 @@ public interface Session * <li> pre-acquire (1): the message is acquired when the transfer starts
* </ul>
* @param exchange The exchange the message is being sent.
- * @throws QpidException If the session fails to send the message due to some error.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException;
+ public void messageTransfer(String exchange, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
- * @throws QpidException If the session fails to execute the method due to some error
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers) throws QpidException;
+ public void addMessageHeaders(Header... headers);
/**
* Add the following byte array to the content of the message being sent.
@@ -138,16 +134,13 @@ public interface Session * @param data Data to be added.
* @param off Offset from which to start reading data
* @param len Number of bytes to be read
- * @throws QpidException If the session fails to execute the method due to some error
*/
- public void addData(byte[] data, int off, int len) throws QpidException;
+ public void addData(byte[] data, int off, int len);
/**
* Signals the end of data for the message.
- *
- * @throws QpidException If the session fails to execute the method due to some error
*/
- public void endData() throws QpidException;
+ public void endData();
//------------------------------------------------------
// Messaging methods
@@ -189,11 +182,9 @@ public interface Session * @param options Set of Options.
* @param filter A set of filters for the subscription. The syntax and semantics of these filters depends
* on the providers implementation.
- * @throws QpidException If the session fails to create the receiver due to some error.
*/
public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, ?> filter, Option... options)
- throws QpidException;
+ MessagePartListener listener, Map<String, ?> filter, Option... options);
/**
* This method cancels a consumer. This does not affect already delivered messages, but it does
@@ -202,9 +193,8 @@ public interface Session * notification of completion of the cancel command.
*
* @param destination The destination for the subscriber used at subscription
- * @throws QpidException If cancelling the subscription fails due to some error.
*/
- public void messageCancel(String destination) throws QpidException;
+ public void messageCancel(String destination);
/**
* Associate a message part listener with a destination.
@@ -235,9 +225,8 @@ public interface Session * @param destination The destination to set the flow mode on.
* @param mode <ul> <li>credit (0): choose credit based flow control
* <li> window (1): choose window based flow control</ul>
- * @throws QpidException If setting the flow mode fails due to some error.
*/
- public void messageFlowMode(String destination, short mode) throws QpidException;
+ public void messageFlowMode(String destination, short mode);
/**
@@ -257,9 +246,8 @@ public interface Session * <li> byte (1)
* </ul>
* @param value Number of credits, a value of 0 indicates an infinite amount of credit.
- * @throws QpidException If setting the flow fails due to some error.
*/
- public void messageFlow(String destination, short unit, long value) throws QpidException;
+ public void messageFlow(String destination, short unit, long value);
/**
* Forces the broker to exhaust its credit supply.
@@ -268,10 +256,10 @@ public interface Session * <p> This method returns the number of flushed messages.
*
* @param destination The destination to call flush on.
- * @return The number of flushed messages
- * @throws QpidException If flushing fails due to some error.
*/
- public int messageFlush(String destination) throws QpidException;
+ public void messageFlush(String destination);
+
+ public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -280,9 +268,8 @@ public interface Session * further credit is received.
*
* @param destination The destination to stop.
- * @throws QpidException If stopping fails due to some error.
*/
- public void messageStop(String destination) throws QpidException;
+ public void messageStop(String destination);
/**
* Acknowledge the receipt of ranges of messages.
@@ -290,9 +277,8 @@ public interface Session * pre-acquire mode or by explicitly acquiring them.
*
* @param ranges Range of acknowledged messages.
- * @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(RangeSet ranges) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges);
/**
* Reject ranges of acquired messages.
@@ -300,9 +286,8 @@ public interface Session * and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
- * @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(RangeSet ranges) throws QpidException;
+ public void messageReject(RangeSet ranges);
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -313,45 +298,39 @@ public interface Session * <p> This method should only be called on non-acquired messages.
*
* @param range Ranges of messages to be acquired.
- * @return Ranges of explicitly acquired messages.
- * @throws QpidException If this message cannot be acquired dus to some error
*/
- public RangeSet messageAcquire(RangeSet range) throws QpidException;
+ public void messageAcquire(RangeSet ranges);
+
+ public RangeSet getAccquiredMessages();
+
/**
* Give up responsibility for processing ranges of messages.
* <p> Released messages are re-enqueued.
*
* @param range Ranges of messages to be released.
- * @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(RangeSet range) throws QpidException;
+ public void messageRelease(RangeSet ranges);
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
/**
* Selects the session for local transaction support.
- *
- * @throws QpidException If selecting this session for local transaction support fails due to some error.
*/
- public void txSelect() throws QpidException;
+ public void txSelect();
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
- *
- * @throws QpidException If the session fails to commit due to some error.
* @throws IllegalStateException If this session is not transacted.
*/
- public void txCommit() throws QpidException, IllegalStateException;
+ public void txCommit() throws IllegalStateException;
/**
* Rollback the receipt and the delivery of all messages exchanged by this session resources.
- *
- * @throws QpidException If the session fails to rollback due to some error.
* @throws IllegalStateException If this session is not transacted.
*/
- public void txRollback() throws QpidException, IllegalStateException;
+ public void txRollback() throws IllegalStateException;
//---------------------------------------------
// Queue methods
@@ -379,11 +358,9 @@ public interface Session * the queue. </ol>
* @param arguments Used for backward compatibility
* @param options Set of Options.
- * @throws QpidException If the session fails to declare the queue due to some error.
* @see Option
*/
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
- throws QpidException;
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
/**
* Bind a queue with an exchange.
@@ -392,10 +369,8 @@ public interface Session * @param exchangeName The exchange name.
* @param routingKey The routing key.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to bind the queue due to some error.
*/
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException;
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
/**
* Unbind a queue from an exchange.
@@ -404,18 +379,15 @@ public interface Session * @param exchangeName The exchange name.
* @param routingKey The routing key.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to unbind the queue due to some error.
*/
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- throws QpidException;
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
/**
* Purge a queue. i.e. delete all enqueued messages
*
* @param queueName The queue to be purged
- * @throws QpidException If the session fails to purge the queue due to some error.
*/
- public void queuePurge(String queueName) throws QpidException;
+ public void queuePurge(String queueName);
/**
* Delet a queue.
@@ -431,12 +403,11 @@ public interface Session *
* @param queueName The name of the queue to be deleted
* @param options Set of options
- * @throws QpidException If the session fails to delete the queue due to some error.
* @see Option
* <p/>
* Following are the valid options
*/
- public void queueDelete(String queueName, Option... options) throws QpidException;
+ public void queueDelete(String queueName, Option... options);
// --------------------------------------
// exhcange methods
@@ -462,11 +433,10 @@ public interface Session * the message will be sent.
* @param options Set of options.
* @param arguments Used for backward compatibility
- * @throws QpidException If the session fails to declare the exchange due to some error.
* @see Option
*/
public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
- Map<String, ?> arguments, Option... options) throws QpidException;
+ Map<String, ?> arguments, Option... options);
/**
* Delete an exchange.
@@ -482,8 +452,15 @@ public interface Session *
* @param exchangeName The name of exchange to be deleted.
* @param options Set of options.
- * @throws QpidException If the session fails to delete the exchange due to some error.
* @see Option
*/
- public void exchangeDelete(String exchangeName, Option... options) throws QpidException;
+ public void exchangeDelete(String exchangeName, Option... options);
+
+ /**
+ * If the session receives a sessionClosed with an error code it
+ * informs the session's ExceptionListener
+ *
+ * @param exceptionListner The execptionListener
+ */
+ public void setExceptionListener(ExceptionListener exceptionListner);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index f6d0cfaefd..a0111a86f4 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -4,6 +4,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.*; @@ -15,190 +16,201 @@ public class ClientSession implements org.apache.qpidity.client.Session Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - public void close() throws QpidException + public void addData(byte[] data, int off, int len) { - // TODO - + // TODO Auto-generated method stub + } - public void suspend() throws QpidException + public void addMessageHeaders(Header... headers) { - // TODO - + // TODO Auto-generated method stub + } - public void resume() throws QpidException + public void close() { - // TODO + // TODO Auto-generated method stub + + } - }//------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException + public void endData() { - // TODO - + // TODO Auto-generated method stub + } - public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) { - // TODO - + // TODO Auto-generated method stub + } - public void addMessageHeaders(Header... headers) throws QpidException + public void exchangeDelete(String exchangeName, Option... options) { - // TODO - + // TODO Auto-generated method stub + } - public void addData(byte[] data, int off, int len) throws QpidException + public void messageAcknowledge(RangeSet ranges) { - // TODO - + // TODO Auto-generated method stub + } - public void endData() throws QpidException + public void messageAcquire(RangeSet ranges) { - // TODO - + // TODO Auto-generated method stub } - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map<String, ?> filter, Option... options) - throws QpidException + public void messageCancel(String destination) { - // TODO - + // TODO Auto-generated method stub + } - public void messageCancel(String destination) throws QpidException + public void messageFlow(String destination, short unit, long value) { - // TODO - + // TODO Auto-generated method stub + } - public void setMessageListener(String destination, MessagePartListener listener) + public void messageFlowMode(String destination, short mode) { - // TODO - + // TODO Auto-generated method stub + } - public void messageFlowMode(String destination, short mode) throws QpidException + public void messageFlush(String destination) { - // TODO - + // TODO Auto-generated method stub } - public void messageFlow(String destination, short unit, long value) throws QpidException + public void messageReject(RangeSet ranges) { - // TODO - + // TODO Auto-generated method stub + } - public int messageFlush(String destination) throws QpidException + public void messageRelease(RangeSet ranges) { - // TODO - return 1; + // TODO Auto-generated method stub + } - public void messageStop(String destination) throws QpidException + public void messageStop(String destination) { - // TODO - + // TODO Auto-generated method stub + } - public void messageAcknowledge(RangeSet ranges) throws QpidException + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options) { - // TODO - + // TODO Auto-generated method stub + } - public void messageReject(RangeSet ranges) throws QpidException + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) { - // TODO - + // TODO Auto-generated method stub + } - public RangeSet messageAcquire(RangeSet ranges) throws QpidException + public void messageTransfer(String exchange, short confirmMode, short acquireMode) { - // TODO - return null; + // TODO Auto-generated method stub + } - public void messageRelease(RangeSet ranges) throws QpidException + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) { - // TODO + // TODO Auto-generated method stub + + } - }// ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - public void txSelect() throws QpidException + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) { - // TODO - + // TODO Auto-generated method stub + } - public void txCommit() throws QpidException, IllegalStateException + public void queueDelete(String queueName, Option... options) { - // TODO - + // TODO Auto-generated method stub + } - public void txRollback() throws QpidException, IllegalStateException + public void queuePurge(String queueName) { - // TODO - + // TODO Auto-generated method stub + } - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) - throws QpidException + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) { - // TODO - + // TODO Auto-generated method stub + } - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - throws QpidException + public void resume() { - // TODO - + // TODO Auto-generated method stub + } - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - throws QpidException + public void setExceptionListener(ExceptionListener exceptionListner) { - // TODO - + // TODO Auto-generated method stub + } - public void queuePurge(String queueName) throws QpidException + public void setMessageListener(String destination, MessagePartListener listener) { - // TODO + // TODO Auto-generated method stub + + } + public void suspend() + { + // TODO Auto-generated method stub + } - public void queueDelete(String queueName, Option... options) throws QpidException + public void sync() { - // TODO + // TODO Auto-generated method stub + + } + public void txCommit() throws IllegalStateException + { + // TODO Auto-generated method stub + } - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, - Map<String, ?> arguments, Option... options) throws QpidException + public void txRollback() throws IllegalStateException { - // TODO + // TODO Auto-generated method stub + + } + public void txSelect() + { + // TODO Auto-generated method stub + } - public void exchangeDelete(String exchangeName, Option... options) throws QpidException + public RangeSet getAccquiredMessages() { - // TODO + // TODO Auto-generated method stub + return null; + } + public int getNoOfUnAckedMessages() + { + // TODO Auto-generated method stub + return 0; } + + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index bcd3845230..3b6153c487 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -23,9 +23,6 @@ import org.apache.qpidity.QpidException; import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.Session; -import javax.jms.ExceptionListener; -import javax.jms.Connection; import java.util.Vector; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java index 239a70f8d0..6363108d45 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java @@ -85,15 +85,7 @@ public abstract class MessageActor if (!_isClosed) { closeMessageActor(); - try - { - // cancel this destination - getSession().getQpidSession().messageCancel(getMessageActorID()); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + getSession().getQpidSession().messageCancel(getMessageActorID()); //todo: We need to unset the qpid message listener // notify the session that this message actor is closing _session.closeMessageActor(this); @@ -155,6 +147,7 @@ public abstract class MessageActor { if (!_isClosed) { + getSession().getQpidSession().messageCancel(getMessageActorID()); _isClosed = true; } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 57e63dd30c..b9922de296 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -591,7 +591,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges); + getSession().getQpidSession().messageAcquire(ranges); + RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { result = true; // todo acquired.iterator().next().getLower() == message.getMessageID(); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index f0a5ca0ea1..ad85f33aea 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -17,19 +17,19 @@ */ package org.apache.qpidity.jms; +import java.util.Enumeration; +import java.util.NoSuchElementException; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueBrowser; + import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.QpidException; import org.apache.qpidity.impl.MessagePartListenerAdapter; -import javax.jms.QueueBrowser; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Message; -import java.util.Enumeration; -import java.util.NoSuchElementException; - /** * Implementation of the JMS QueueBrowser interface */ @@ -168,17 +168,10 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser _received = 0; // request messages int received = 0; - try - { - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, - _maxbatchlength); - _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, + _maxbatchlength); + _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); } /** diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 9c9f5d9663..d5342fb464 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -26,9 +26,6 @@ import org.apache.qpidity.Range; import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.Session; -import javax.jms.Message; -import javax.jms.MessageListener; import java.io.Serializable; import java.util.LinkedList; import java.util.HashMap; @@ -309,14 +306,7 @@ public class SessionImpl implements Session throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted"); } // commit the underlying Qpid Session - try - { - _qpidSession.txCommit(); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + _qpidSession.txCommit(); } /** @@ -335,14 +325,7 @@ public class SessionImpl implements Session throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted"); } // rollback the underlying Qpid Session - try - { - _qpidSession.txRollback(); - } - catch (org.apache.qpidity.QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + _qpidSession.txRollback(); } /** @@ -401,14 +384,7 @@ public class SessionImpl implements Session _incomingAsynchronousMessages.notifyAll(); } // close the underlaying QpidSession - try - { - _qpidSession.close(); - } - catch (org.apache.qpidity.QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + _qpidSession.close(); } } @@ -446,14 +422,7 @@ public class SessionImpl implements Session RangeSet ranges = new RangeSet(); // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - try - { - getQpidSession().messageRelease(ranges); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + getQpidSession().messageRelease(ranges); // TODO We can be a little bit cleverer and build a set of ranges } } @@ -566,15 +535,17 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(destination); - MessageConsumerImpl consumer; + MessageConsumerImpl consumer = null; try { consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); } catch (Exception e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + // TODO Auto-generated catch block + e.printStackTrace(); } + // register this actor with the session _messageActors.put(consumer.getMessageActorID(), consumer); return consumer; @@ -599,14 +570,15 @@ public class SessionImpl implements Session public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - Queue result; + Queue result = null; try { result = new QueueImpl(this, queueName); } catch (QpidException e) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + // TODO Auto-generated catch block + e.printStackTrace(); } return result; } @@ -987,14 +959,7 @@ public class SessionImpl implements Session RangeSet ranges = new RangeSet(); // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - try - { - getQpidSession().messageAcknowledge(ranges); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + getQpidSession().messageAcknowledge(ranges); } //tobedone: Implement DUPS OK heuristic } @@ -1022,15 +987,8 @@ public class SessionImpl implements Session // acknowledge this message RangeSet ranges = new RangeSet(); // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); - try - { - getQpidSession().messageAcknowledge(ranges); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + // ranges.add(message.getMessageID()); + getQpidSession().messageAcknowledge(ranges); // TODO We can be a little bit cleverer and build a set of ranges } //empty the list of unack messages diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java index 0ec8c4a423..efcb4430c8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -72,14 +72,7 @@ public class XAResourceImpl implements XAResource { _logger.debug("commit ", xid); } - try - { - _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ? Option.ONE_PHASE : Option.NO_OPTION); } /** @@ -104,17 +97,10 @@ public class XAResourceImpl implements XAResource { _logger.debug("end ", xid); } - try - { - _xid = null; - _xaSession.getQpidSession() - .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, - flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + xid = null; + _xaSession.getQpidSession() + .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); } /** @@ -130,15 +116,7 @@ public class XAResourceImpl implements XAResource { _logger.debug("forget ", xid); } - try - { - _xaSession.getQpidSession() - .dtxCoordinationForget(xid); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + _xaSession.getQpidSession().dtxCoordinationForget(xid); } /** @@ -155,14 +133,7 @@ public class XAResourceImpl implements XAResource int result = 0; if (_xid != null) { - try - { - result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + result = (int) _xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid); } return result; } @@ -198,15 +169,9 @@ public class XAResourceImpl implements XAResource _logger.debug("prepare ", xid); } int result; - try - { - result = _xaSession.getQpidSession() - .dtxCoordinationPrepare(xid); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + result = _xaSession.getQpidSession() + .dtxCoordinationPrepare(xid); + if (result == XAException.XA_RDONLY) { throw new XAException(XAException.XA_RDONLY); @@ -232,16 +197,9 @@ public class XAResourceImpl implements XAResource */ public Xid[] recover(int flag) throws XAException { - try - { - // the flag is ignored - return _xaSession.getQpidSession() - .dtxCoordinationRecover(); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } +// the flag is ignored + return _xaSession.getQpidSession() + .dtxCoordinationRecover(); } /** @@ -252,16 +210,9 @@ public class XAResourceImpl implements XAResource */ public void rollback(Xid xid) throws XAException { - try - { - // the flag is ignored - _xaSession.getQpidSession() - .dtxCoordinationRollback(xid); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } +// the flag is ignored + _xaSession.getQpidSession() + .dtxCoordinationRollback(xid); } /** @@ -279,17 +230,9 @@ public class XAResourceImpl implements XAResource boolean result = false; if (_xid != null) { - try - { - // the flag is ignored - _xaSession.getQpidSession() - .dtxCoordinationSetTimeout(_xid, timeout); - result = true; - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + _xaSession.getQpidSession() + .dtxCoordinationSetTimeout(_xid, timeout); + result = true; } return result; } @@ -315,15 +258,8 @@ public class XAResourceImpl implements XAResource _logger.debug("start ", xid); } _xid = xid; - try - { - _xaSession.getQpidSession() - .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); - } - catch (QpidException e) - { - throw ExceptionHelper.convertQpidExceptionToXAException(e); - } + _xaSession.getQpidSession() + .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); } } diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java index 5e397ae574..4e654f706b 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java @@ -66,8 +66,8 @@ class ProtocolHeader buf.put(AMQP); buf.put((byte) 1); buf.put((byte) 1); - buf.put((byte) 0); - buf.put((byte) 10); + buf.put( major); + buf.put(minor); buf.flip(); return buf; } diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 228c4cf495..067fc8c83c 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -21,11 +21,8 @@ package org.apache.qpidity; import java.nio.ByteBuffer; - import java.util.HashMap; import java.util.Map; - -import org.apache.qpidity.api.Message; /** * Session * @@ -160,7 +157,7 @@ public class Session extends Invoker channel.data(bytes); } - public void end() + public void endData() { channel.end(); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 2e27dc8574..4ec838bc35 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -79,11 +79,11 @@ class ToyClient extends SessionDelegate ssn.headers(new DeliveryProperties(), new MessageProperties()); ssn.data("this is the data"); - ssn.end(); + ssn.endData(); ssn.messageTransfer("fdsa", (short) 0, (short) 1); ssn.data("this should be rejected"); - ssn.end(); + ssn.endData(); ssn.sync(); } |
