From fa8babc296c51c31166e718af1cdd3b4c650142e Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 15 Jun 2012 17:19:57 +0000 Subject: QPID-4027 Experimented with adding common functionality via Decorators as opposed to inheritence. ConnectionManagementDecorator and SessionManagementDecorator adds common session/connection management, state management, error handling etc. The decorator approach allows us to add/remove functionality easily and to isolate logic without assuming/depending on base classes like we do with our current client. Added some extension interfaces under the "ext" package that facilitates the implementation of the API. This package is not intended to be visible to the users. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350699 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/messaging/ReadOnlyMessageAdapter.java | 191 --------- .../apache/qpid/messaging/ext/ConnectionExt.java | 46 ++ .../messaging/ext/ConnectionStateListener.java | 29 ++ .../util/ConnectionManagementDecorator.java | 285 ++++++++++++ .../messaging/util/ReadOnlyMessageAdapter.java | 191 +++++++++ .../messaging/util/SessionManagementDecorator.java | 477 +++++++++++++++++++++ 6 files changed, 1028 insertions(+), 191 deletions(-) delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java (limited to 'qpid/java') diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java deleted file mode 100644 index 0dfd6e735d..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java +++ /dev/null @@ -1,191 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.messaging; - -import java.util.Collections; -import java.util.Map; - -/** - * Ensures the message is read only by blocking the delegates - * setter methods. - */ -public class ReadOnlyMessageAdapter implements Message -{ - private Message _delegate; - - ReadOnlyMessageAdapter(Message delegate) - { - _delegate = delegate; - } - - @Override - public Object getContent() throws MessagingException - { - return _delegate.getContent(); - } - - @Override - public String getMessageId() throws MessagingException - { - return _delegate.getMessageId(); - } - - @Override - public void setMessageId(String messageId) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public String getSubject() throws MessagingException - { - return _delegate.getSubject(); - } - - @Override - public void setSubject(String subject) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public String getContentType() throws MessagingException - { - return _delegate.getContentType(); - } - - @Override - public void setContentType(String contentType) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public String getCorrelationId() throws MessagingException - { - return _delegate.getCorrelationId(); - } - - @Override - public void setCorrelationId(String correlationId) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public String getReplyTo() throws MessagingException - { - return _delegate.getReplyTo(); - } - - @Override - public void setReplyTo(String replyTo) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public String getUserId() throws MessagingException - { - return _delegate.getUserId(); - } - - @Override - public void setUserId(String userId) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public boolean isDurable() throws MessagingException - { - return _delegate.isDurable(); - } - - @Override - public void setDurable(boolean durable) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public boolean isRedelivered() throws MessagingException - { - return _delegate.isRedelivered(); - } - - @Override - public void setRedelivered(boolean redelivered) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public int getPriority() throws MessagingException - { - return _delegate.getPriority(); - } - - @Override - public void setPriority(int priority) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public long getTtl() throws MessagingException - { - return _delegate.getTtl(); - } - - @Override - public void setTtl(long ttl) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public long getTimestamp() throws MessagingException - { - return _delegate.getTimestamp(); - } - - @Override - public void setTimestamp(long timestamp) throws MessagingException - { - throwMessageNotWritableException(); - } - - @Override - public Map getProperties() throws MessagingException - { - return Collections.unmodifiableMap(_delegate.getProperties()); - } - - @Override - public void setProperty(String key, Object value) throws MessagingException - { - throwMessageNotWritableException(); - } - - private void throwMessageNotWritableException() throws MessageNotWritableException - { - throw new MessageNotWritableException("Message is read-only"); - } - -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java new file mode 100644 index 0000000000..557c3c0fe7 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java @@ -0,0 +1,46 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import java.util.List; + +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Session; + +/** + * An extended interface meant for API implementors. + */ +public interface ConnectionExt extends Connection +{ + public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + + public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + + public List getSessions() throws ConnectionException; + + public void exception(ConnectionException e); + + /** + * The per connection lock that is used by the connection + * and it's child objects. A single lock is used to prevent + * deadlocks that could occur with having multiple locks, + * perhaps at the cost of a minor perf degradation. + */ + public Object getConnectionLock(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java new file mode 100644 index 0000000000..e1af2479c5 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java @@ -0,0 +1,29 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.ConnectionException; + +public interface ConnectionStateListener +{ + public void exception(ConnectionException e); + + public void opened(); + + public void closed(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java new file mode 100644 index 0000000000..5200e5081d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java @@ -0,0 +1,285 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.ConnectionExt; +import org.apache.qpid.messaging.ext.ConnectionStateListener; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a connection. + * This allows the various implementations to reuse basic functions. + * This class adds, + * 1. Basic session mgt (tracking, default name generation ..etc) + * 2. Connection state management. + * 3. Error handling. + * + * Close() can be called by, + *
    + *
  1. The application (normal close)
  2. + *
  3. By the parent if it's not null (error)
  4. + *
  5. By this object if parent is null (error)
  6. + *
+ *
+ * + * Failover + * This Decorator does not handle any failover. + * + * If failover is handled at a layer above then it will take appropriate action. + * @see ConnectionFailoverDecorator for an example. + * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable. + * Therefore this class will attempt to close the connection if the parent is null. + */ +public class ConnectionManagementDecorator implements ConnectionExt +{ + private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class); + + public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR} + + private ConnectionExt _parent; + private Connection _delegate; + private ConnectionState _state = ConnectionState.UNDEFINED; + private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); + private Map _sessions = new ConcurrentHashMap(); + private ConnectionException _lastException; + private List _stateListeners = new ArrayList(); + + private final Object _connectionLock; + + public ConnectionManagementDecorator(Connection delegate) + { + this(null,delegate); + } + + public ConnectionManagementDecorator(ConnectionExt parent, Connection delegate) + { + _delegate = delegate; + _parent = parent; + _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock(); + } + + @Override + public void open() throws MessagingException + { + // return without exception denotes success + _delegate.open(); + synchronized (_connectionLock) + { + _state = ConnectionState.OPENED; + for (ConnectionStateListener l: _stateListeners) + { + l.opened(); + } + } + } + + @Override + public boolean isOpen() throws MessagingException + { + return _delegate.isOpen(); + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Connection is already closed"); + synchronized(_connectionLock) + { + _state = ConnectionState.CLOSED; + for (Session ssn : _sessions.values()) + { + ssn.close(); + } + _sessions.clear(); + + for (ConnectionStateListener l: _stateListeners) + { + l.closed(); + } + } + _delegate.close(); + } + + @Override + public Session createSession(String name) throws MessagingException + { + checkClosedAndThrowException(); + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(ConnectionException e) + { + exception(e); + // If there is a failover handler above this it will handle it. + // Otherwise the application gets this. + throw new ConnectionException("Connection closed",e); + } + } + + @Override + public Session createTransactionalSession(String name) + throws MessagingException + { + checkClosedAndThrowException(); + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(ConnectionException e) + { + exception(e); + // If there is a failover handler above this it will handle it. + // Otherwise the application gets this. + throw new ConnectionException("Connection closed",e); + } + } + + @Override + public String getAuthenticatedUsername() throws MessagingException + { + checkClosedAndThrowException(); + return _delegate.getAuthenticatedUsername(); + } + + @Override + public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException + { + checkClosedAndThrowException(); + synchronized (_connectionLock) + { + _stateListeners.add(l); + } + } + + @Override + public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException + { + checkClosedAndThrowException(); + synchronized (_connectionLock) + { + _stateListeners.remove(l); + } + } + + @Override + public List getSessions() throws ConnectionException + { + checkClosedAndThrowException(); + return new ArrayList(_sessions.values()); + } + + @Override // Called by the delegate or a a session created by this connection. + public void exception(ConnectionException e) + { + synchronized(_connectionLock) + { + _state = ConnectionState.ERROR; + if (_lastException != null) + { + _logger.warn("Last exception was not notified to the application", _lastException); + } + _lastException = e; + + for (ConnectionStateListener l: _stateListeners) + { + l.exception(_lastException); + } + + if (_parent != null) + { + _parent.exception(e); + } + else + { + try + { + close(); + } + catch(MessagingException ex) + { + //ignore + } + } + } + // should we clean lastException if we notify it via a listener? + } + + @Override + public Object getConnectionLock() + { + return _connectionLock; + } + + private void checkClosedAndThrowException() throws ConnectionException + { + checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection"); + } + + private void checkClosedAndThrowException(String msg) throws ConnectionException + { + switch (_state) + { + case UNDEFINED: + case ERROR: + throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this"); + case CLOSED: + synchronized(_connectionLock) + { + if(_lastException != null) + { + Throwable cause = _lastException; + _lastException = null; + throw new ConnectionException(msg, cause); + } + else + { + throw new ConnectionException(msg); + } + } + default: + break; + } + } + + private String generateSessionName() + { + // TODO add local IP and pid to the beginning; + return _ssnNameGenerator.generate().toString(); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java new file mode 100644 index 0000000000..0dfd6e735d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java @@ -0,0 +1,191 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging; + +import java.util.Collections; +import java.util.Map; + +/** + * Ensures the message is read only by blocking the delegates + * setter methods. + */ +public class ReadOnlyMessageAdapter implements Message +{ + private Message _delegate; + + ReadOnlyMessageAdapter(Message delegate) + { + _delegate = delegate; + } + + @Override + public Object getContent() throws MessagingException + { + return _delegate.getContent(); + } + + @Override + public String getMessageId() throws MessagingException + { + return _delegate.getMessageId(); + } + + @Override + public void setMessageId(String messageId) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public String getSubject() throws MessagingException + { + return _delegate.getSubject(); + } + + @Override + public void setSubject(String subject) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public String getContentType() throws MessagingException + { + return _delegate.getContentType(); + } + + @Override + public void setContentType(String contentType) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public String getCorrelationId() throws MessagingException + { + return _delegate.getCorrelationId(); + } + + @Override + public void setCorrelationId(String correlationId) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public String getReplyTo() throws MessagingException + { + return _delegate.getReplyTo(); + } + + @Override + public void setReplyTo(String replyTo) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public String getUserId() throws MessagingException + { + return _delegate.getUserId(); + } + + @Override + public void setUserId(String userId) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public boolean isDurable() throws MessagingException + { + return _delegate.isDurable(); + } + + @Override + public void setDurable(boolean durable) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public boolean isRedelivered() throws MessagingException + { + return _delegate.isRedelivered(); + } + + @Override + public void setRedelivered(boolean redelivered) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public int getPriority() throws MessagingException + { + return _delegate.getPriority(); + } + + @Override + public void setPriority(int priority) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public long getTtl() throws MessagingException + { + return _delegate.getTtl(); + } + + @Override + public void setTtl(long ttl) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public long getTimestamp() throws MessagingException + { + return _delegate.getTimestamp(); + } + + @Override + public void setTimestamp(long timestamp) throws MessagingException + { + throwMessageNotWritableException(); + } + + @Override + public Map getProperties() throws MessagingException + { + return Collections.unmodifiableMap(_delegate.getProperties()); + } + + @Override + public void setProperty(String key, Object value) throws MessagingException + { + throwMessageNotWritableException(); + } + + private void throwMessageNotWritableException() throws MessageNotWritableException + { + throw new MessageNotWritableException("Message is read-only"); + } + +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java new file mode 100644 index 0000000000..027e9b9605 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java @@ -0,0 +1,477 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.Sender; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.ConnectionExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a session. + * This class adds, + * 1. Management of receivers and senders created by this session. + * 2. State management. + * 3. Exception handling. + * + * Exception Handling + * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the session is already CLOSED or in an ERROR situation. + * Then it will look for connection and session errors and handle as follows. + * + * Connection Exceptions + * This class intercepts ConnectionException's and are passed onto the connection. + * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message. + * Any further use of the session is prevented until it moves to OPENED. + * + * If failover is handled at a layer above, there will be a Session Decorator that + * would handle the session exception and retry when the connection is available. + * This handler may block the call until the state moves into either OPENED or CLOSED. + * Ex @see SessionFailoverDecorator. + * + * If failover is handled at a layer below, then a connection exception means it has failed already. + * Therefore when passed to the connection,the exception will be thrown directly to the application. + * The connection object will be responsible for calling close on this session for the above case. + * + * Close() can be called by, + *
    + *
  1. The application (normal close)
  2. + *
  3. By the parent via failover (error)
  4. + *
  5. By the connection object, if not failover(error)
  6. + *
+ *
+ * + * Session Exceptions + * For the time being, anytime a session exception is received, the session will be marked CLOSED. + * We need to revisit this. + */ +public class SessionManagementDecorator implements Session +{ + private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class); + + public enum SessionState { UNDEFINED, OPENED, CLOSED, ERROR} + + private ConnectionExt _conn; + private Session _delegate; + SessionState _state = SessionState.UNDEFINED; + private List _receivers = new ArrayList(); + private List _senders = new ArrayList(); + private final Object _connectionLock; // global per connection lock + + public SessionManagementDecorator(ConnectionExt conn, Session delegate) + { + _conn = conn; + _delegate = delegate; + _connectionLock = conn.getConnectionLock(); + } + + @Override + public boolean isClosed() + { + return _state == SessionState.CLOSED; + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Session is already closed"); + synchronized(_connectionLock) + { + _state = SessionState.CLOSED; + for (Sender sender: _senders) + { + sender.close(); + } + _senders.clear(); + + for (Receiver receiver: _receivers) + { + receiver.close(); + } + _receivers.clear(); + _delegate.close(); + } + } + + @Override + public void commit() throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.commit(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void rollback() throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.rollback(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(boolean sync) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.acknowledge(sync); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(Message message, boolean sync) + throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.acknowledge(message, sync); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void reject(Message message) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.reject(message); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void release(Message message) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.release(message); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void sync(boolean block) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.sync(block); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getReceivable() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getReceivable(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettledAcks() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getUnsettledAcks(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver nextReceiver(long timeout) throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.nextReceiver(timeout); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(Address address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Sender sender = _delegate.createSender(address); + _senders.add(sender); + return sender; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(String address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Sender sender = _delegate.createSender(address); + _senders.add(sender); + return sender; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(Address address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Receiver receiver = _delegate.createReceiver(address); + _receivers.add(receiver); + return receiver; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(String address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Receiver receiver = _delegate.createReceiver(address); + _receivers.add(receiver); + return receiver; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Connection getConnection() throws MessagingException + { + checkError(); + return _conn; // always return your peer (not your delegate's peer) + } + + @Override + public boolean hasError() + { + return _delegate.hasError(); + } + + @Override + public void checkError() throws MessagingException + { + checkClosedAndThrowException(); // check if we already have the info. + try + { + // Asking the delegate. + _delegate.checkError(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + private void checkClosedAndThrowException() throws SessionException + { + checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion"); + } + + private void checkClosedAndThrowException(String closedMessage) throws SessionException + { + switch (_state) + { + case ERROR: + case UNDEFINED: + throw new SessionException("Session is in a temporary error state. The session may or may not recover from this"); + case CLOSED: + throw new SessionException(closedMessage); + } + } + + /** + * A ConnectionException will cause the Session to go into a temporary error state, + * which prevents it from being used further. + * From there the Session can be moved into OPENED (if failover works) or + * CLOSED if there is no failover or if failover has failed. + * @param e + * @throws MessagingException + */ + private SessionException handleConnectionException(ConnectionException e) + { + synchronized (_connectionLock) + { + _state = SessionState.ERROR; + _conn.exception(e); // This might trigger failover in a layer above. + if (_state == SessionState.CLOSED) + { + // The connection has instructed the session to be closed. + // Either there was no failover, or failover has failed. + return new SessionException("Session is closed due to connection error",e); + } + else + { + // Asking the application or the Parent handler to retry the operation. + // The Session should be in OPENED state at this time. + return new SessionException("Session was in a temporary error state due to connection error." + + "Plase retry your operation",e); + } + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + private SessionException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + _state = SessionState.CLOSED; + } + return new SessionException("Session has been closed",e); + } +} -- cgit v1.2.1