summaryrefslogtreecommitdiff
path: root/qpid/java/management/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commita09ed43cc8ed5862996e684b924f3405e09734c3 (patch)
tree975ceb9d20c055ca4daa5875fdf28932e9ee8eef /qpid/java/management/client/src
parent8ccc685eaf36dae9fe73d50d8410a062a015943b (diff)
downloadqpid-python-a09ed43cc8ed5862996e684b924f3405e09734c3.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/management/client/src')
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java157
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java156
2 files changed, 150 insertions, 163 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
index f4428cb1e2..2c36fb3d65 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
@@ -28,140 +28,95 @@ import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.DtxSession;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid datasource.
- * Basically it is a connection pool manager used for optimizing broker connections usage.
- *
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
* @author Andrea Gazzarini
*/
-public final class QpidDatasource
+public final class QpidDatasource
{
private final static Logger LOGGER = Logger.get(QpidDatasource.class);
-
+
/**
* A connection decorator used for adding pool interaction behaviour to an existing connection.
- *
+ *
* @author Andrea Gazzarini
*/
- public class ConnectionDecorator implements Connection,ClosedListener
+ class PooledConnection extends Connection
{
- private final Connection _decoratee;
private final UUID _brokerId;
private boolean _valid;
-
+
/**
* Builds a new decorator with the given connection.
- *
+ *
* @param brokerId the broker identifier.
* @param decoratee the underlying connection.
*/
- private ConnectionDecorator(UUID brokerId, Connection decoratee)
+ private PooledConnection(UUID brokerId)
{
- this._decoratee = decoratee;
this._brokerId = brokerId;
- _decoratee.setClosedListener(this);
_valid = true;
}
-
+
/**
* Returns true if the underlying connection is still valid and can be used.
- *
+ *
* @return true if the underlying connection is still valid and can be used.
*/
boolean isValid()
{
return _valid;
}
-
+
+ void reallyClose()
+ {
+ super.close();
+ }
+
/**
* Returns the connection to the pool. That is, marks this connections as available.
* After that, this connection will be available for further operations.
*/
- public void close () throws QpidException
+ public void close()
{
try
{
pools.get(_brokerId).returnObject(this);
LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this);
- } catch (Exception exception)
+ }
+ catch (Exception e)
{
- throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception);
- }
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String host, int port, String virtualHost, String username, String password)
- throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String url) throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
- }
-
- /**
- * @see Connection#createDTXSession(int)
- */
- public DtxSession createDTXSession (int expiryInSeconds)
- {
- return _decoratee.createDTXSession(expiryInSeconds);
+ throw new ConnectionException(e);
+ }
}
- /**
- * @see Connection#createSession(long)
- */
- public Session createSession (long expiryInSeconds)
+ public void exception(Throwable t)
{
- return _decoratee.createSession(expiryInSeconds);
+ super.exception(t);
+ _valid = false;
}
+ }
- /**
- * Do nothing : closed listener has been already injected.
- */
- public void setClosedListener (ClosedListener exceptionListner)
- {
- }
-
- /**
- * Callback method used for error notifications while underlying connection is closing.
- */
- public void onClosed (ErrorCode errorCode, String reason, Throwable t)
- {
- _valid = false;
- LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode());
- }
- };
-
/**
- * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
+ * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
* the broker connection(s).
- *
+ *
* @author Andrea Gazzarini
*/
class QpidConnectionFactory extends BasePoolableObjectFactory
- {
+ {
private final BrokerConnectionData _connectionData;
private final UUID _brokerId;
-
+
/**
* Builds a new connection factory with the given parameters.
- *
+ *
* @param brokerId the broker identifier.
* @param connectionData the connecton data.
*/
@@ -170,35 +125,35 @@ public final class QpidDatasource
this._connectionData = connectionData;
this._brokerId = brokerId;
}
-
+
/**
* Creates a new underlying connection.
*/
@Override
public Connection makeObject () throws Exception
{
- Connection connection = Client.createConnection();
+ PooledConnection connection = new PooledConnection(_brokerId);
connection.connect(
- _connectionData.getHost(),
- _connectionData.getPort(),
- _connectionData.getVirtualHost(),
- _connectionData.getUsername(),
+ _connectionData.getHost(),
+ _connectionData.getPort(),
+ _connectionData.getVirtualHost(),
+ _connectionData.getUsername(),
_connectionData.getPassword());
- return new ConnectionDecorator(_brokerId,connection);
+ return connection;
}
-
+
/**
* Validates the underlying connection.
*/
@Override
public boolean validateObject (Object obj)
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
+ PooledConnection connection = (PooledConnection) obj;
boolean isValid = connection.isValid();
LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid);
return isValid;
}
-
+
/**
* Closes the underlying connection.
*/
@@ -207,8 +162,8 @@ public final class QpidDatasource
{
try
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
- connection._decoratee.close();
+ PooledConnection connection = (PooledConnection) obj;
+ connection.reallyClose();
LOGGER.debug("<QMAN-200014> : Connection has been destroyed.");
} catch (Exception e)
{
@@ -216,21 +171,21 @@ public final class QpidDatasource
}
}
}
-
+
// Singleton instance.
private static QpidDatasource instance = new QpidDatasource();
// Each entry contains a connection pool for a specific broker.
private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
-
+
// Private constructor.
private QpidDatasource()
{
}
-
+
/**
* Gets an available connection from the pool of the given broker.
- *
+ *
* @param brokerId the broker identifier.
* @return a valid connection to the broker associated with the given identifier.
*/
@@ -238,20 +193,20 @@ public final class QpidDatasource
{
return (Connection) pools.get(brokerId).borrowObject();
}
-
+
/**
* Entry point method for retrieving the singleton instance of this datasource.
- *
+ *
* @return the qpid datasource singleton instance.
*/
- public static QpidDatasource getInstance()
+ public static QpidDatasource getInstance()
{
return instance;
}
-
+
/**
* Adds a connection pool to this datasource.
- *
+ *
* @param brokerId the broker identifier that will be associated with the new connection pool.
* @param connectionData the broker connection data.
* @throws Exception when the pool cannot be created.
@@ -265,12 +220,12 @@ public final class QpidDatasource
true,
false);
ObjectPool pool = factory.createPool();
-
+
for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++)
{
pool.returnObject(pool.borrowObject());
}
-
+
pools.put(brokerId,pool);
}
} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
index 92689eba52..4bda450315 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
@@ -22,42 +22,47 @@ package org.apache.qpid.management.domain.services;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.QpidException;
import org.apache.qpid.management.Constants;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.configuration.QpidDatasource;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid Broker facade.
- *
+ *
* @author Andrea Gazzarini
*/
-public class QpidService
+public class QpidService implements SessionListener
{
private final static Logger LOGGER = Logger.get(QpidService.class);
// Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication.
- private static class Log
- {
+ private static class Log
+ {
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(byte [] messageContent)
+ static void logMessageContent(byte [] messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -65,56 +70,81 @@ public class QpidService
Arrays.toString(messageContent));
}
}
-
+
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(ByteBuffer messageContent)
+ static void logMessageContent(ByteBuffer messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"<QMAN-200002> : Message has been sent to management exchange.");
}
- }
+ }
}
-
+
private UUID _brokerId;
private Connection _connection;
private Session _session;
-
+ private Map<String,MessagePartListenerAdapter> _listeners;
+
/**
* Builds a new service with the given connection data.
- *
+ *
* @param connectionData the connection data of the broker.
*/
- public QpidService(UUID brokerId)
+ public QpidService(UUID brokerId)
{
this._brokerId = brokerId;
}
-
+
/**
* Estabilishes a connection with the broker.
- *
+ *
* @throws QpidException in case of connection failure.
*/
public void connect() throws Exception
{
_connection = QpidDatasource.getInstance().getConnection(_brokerId);
+ _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
_session = _connection.createSession(Constants.NO_EXPIRATION);
+ _session.setSessionListener(this);
+ }
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+ if (l == null)
+ {
+ LOGGER.error("unhandled message: %s", xfr);
+ }
+ else
+ {
+ l.messageTransfer(xfr);
+ }
}
-
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ LOGGER.error(exc, "session %s exception", ssn);
+ }
+
+ public void closed(Session ssn) {}
+
/**
- * All the previously entered outstanding commands are asynchronous.
+ * All the previously entered outstanding commands are asynchronous.
* Synchronous behavior is achieved through invoking this method.
*/
- public void sync()
+ public void sync()
{
_session.sync();
}
-
+
/**
* Closes communication with broker.
*/
@@ -124,48 +154,50 @@ public class QpidService
{
_session.close();
_session = null;
+ _listeners = null;
} catch (Exception e)
{
}
try
{
- _connection.close();
+ _connection.close();
_connection = null;
} catch (Exception e)
{
}
}
-
+
/**
* Associate a message listener with a destination therefore creating a new subscription.
- *
+ *
* @param queueName The name of the queue that the subscriber is receiving messages from.
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
- * @param listener the listener for this destination.
- *
+ * @param listener the listener for this destination.
+ *
* @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
*/
- public void createSubscription(String queueName, String destinationName,MessageListener listener)
+ public void createSubscription(String queueName, String destinationName, MessageListener listener)
{
- _session.messageSubscribe(
- queueName,
- destinationName,
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
-
- _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
- _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE);
-
+ _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+ _session.messageSubscribe
+ (queueName,
+ destinationName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+
+ _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+ _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
LOGGER.debug(
- "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
+ "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
queueName,
destinationName);
}
-
+
/**
* Removes a previously declared consumer from the broker.
- *
+ *
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
* @see Session#messageCancel(String, Option...)
*/
@@ -173,10 +205,10 @@ public class QpidService
{
_session.messageCancel(destinationName);
LOGGER.debug(
- "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
+ "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
destinationName);
- }
-
+ }
+
/**
* Declares a queue on the broker with the given name.
*
@@ -200,27 +232,27 @@ public class QpidService
_session.queueDelete(queueName);
LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName);
}
-
+
/**
* Binds (on the broker) a queue with an exchange.
*
- * @param queueName the name of the queue to bind.
+ * @param queueName the name of the queue to bind.
* @param exchangeName the exchange name.
- * @param routingKey the routing key used for the binding.
+ * @param routingKey the routing key used for the binding.
* @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
*/
public void declareBinding(String queueName, String exchangeName, String routingKey)
{
_session.exchangeBind(queueName, exchangeName, routingKey, null);
LOGGER.debug(
- "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
+ "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Removes a previously declare binding between an exchange and a queue.
- *
+ *
* @param queueName the name of the queue.
* @param exchangeName the name of the exchange.
* @param routingKey the routing key used for binding.
@@ -229,42 +261,42 @@ public class QpidService
{
_session.exchangeUnbind(queueName, exchangeName, routingKey);
LOGGER.debug(
- "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
+ "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(byte [] messageData)
+ public void sendCommandMessage(byte [] messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(ByteBuffer messageData)
+ public void sendCommandMessage(ByteBuffer messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
- }
+ }
} \ No newline at end of file