summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
commitc8b145b9cf5282d534f204a5be48d19fe507ecb3 (patch)
tree2957d1a3121f0e446569487d87987aa4a8eaaff7 /java/client/src
parente70492626c601e6c610ccea3e33b2a91188f3f76 (diff)
downloadqpid-python-c8b145b9cf5282d534f204a5be48d19fe507ecb3.tar.gz
QPID-3473 : Replace use of MINA IO with transport IO (joint work with Robbie Gemmel)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1166069 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java151
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java115
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java312
6 files changed, 74 insertions, 544 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 6b9cf909a8..6dfbb969e7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -1,6 +1,6 @@
package org.apache.qpid.client;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.qpid.client;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
@@ -58,7 +59,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
* This class logger.
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
-
+
/**
* The name of the UUID property
*/
@@ -273,10 +274,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
ConnectionClose close = exc.getClose();
- if (close == null)
+ if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
@@ -345,12 +346,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return ProtocolVersion.v0_10;
}
-
+
public String getUUID()
{
- return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
}
-
+
private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
@@ -358,7 +359,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
conSettings.setVhost(_conn.getVirtualHost());
conSettings.setUsername(_conn.getUsername());
conSettings.setPassword(_conn.getPassword());
-
+
// Pass client name from connection URL
Map<String, Object> clientProps = new HashMap<String, Object>();
try
@@ -375,7 +376,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return conSettings;
}
-
+
// The idle_timeout prop is in milisecs while
// the new heartbeat prop is in secs
private int getHeartbeatInterval(BrokerDetails brokerDetail)
@@ -390,7 +391,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
}
- else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
+ else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
{
heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
_logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 8bc9889050..948f5178a6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
+import java.security.Security;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -55,6 +56,8 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,9 +121,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
+ SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
+
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslContext);
- _conn._protocolHandler.setNetworkConnection(network);
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext);
+ _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender()));
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index e9e52cc97c..28d19ce817 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
* <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
* </table>
*
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos)
+ * Then have a wrapping continuation (this), which blocks on an arbitrary
* Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
* Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
* to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 34c6468629..6d6cd9cae5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -57,8 +56,6 @@ import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -169,14 +166,12 @@ public class AMQProtocolHandler implements ProtocolEngine
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
private AMQCodecFactory _codecFactory;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
- private NetworkTransport _transport;
+
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
@@ -191,24 +186,6 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
- _poolReference.setThreadFactory(new ThreadFactory()
- {
-
- public Thread newThread(final Runnable runnable)
- {
- try
- {
- return Threading.getThreadFactory().createThread(runnable);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- }
- });
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
- _poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
@@ -329,17 +306,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
-
- if (cause instanceof ProtocolCodecException)
- {
- _logger.info("Protocol Exception caught NOT going to attempt failover as " +
- "cause isn't AMQConnectionClosedException: " + cause, cause);
-
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToAllWaiters(amqe);
- }
_connection.exceptionReceived(cause);
-
}
// FIXME Need to correctly handle other exceptions. Things like ...
@@ -433,76 +400,63 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
+ _readBytes += msg.remaining();
try
{
- _readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ // Decode buffer
+
+ for (AMQDataBlock message : dataBlocks)
{
- public void run()
- {
- // Decode buffer
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
- for (AMQDataBlock message : dataBlocks)
+ if(message instanceof AMQFrame)
{
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- try
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
-
- if(message instanceof AMQFrame)
- {
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
- {
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- bodyFrame.handle(frame.getChannel(), _protocolSession);
-
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
- }
- }
- catch (Exception e)
+ if (debug && ((msgNumber % 1000) == 0))
{
- _logger.error("Exception processing frame", e);
- propagateExceptionToFrameListeners(e);
- exception(e);
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
- });
}
catch (Exception e)
{
+ _logger.error("Exception processing frame", e);
propagateExceptionToFrameListeners(e);
exception(e);
}
+
+
}
public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -568,17 +522,13 @@ public class AMQProtocolHandler implements ProtocolEngine
writeFrame(frame, false);
}
- public void writeFrame(AMQDataBlock frame, boolean wait)
+ public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
{
final ByteBuffer buf = frame.toNioByteBuffer();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _sender.send(buf);
- }
- });
+ _sender.send(buf);
+ _sender.flush();
+
if (PROTOCOL_DEBUG)
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -595,10 +545,6 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection.bytesSent(_writtenBytes);
- if (wait)
- {
- _sender.flush();
- }
}
/**
@@ -723,7 +669,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
- _poolReference.releaseExecutorService();
+
}
/** @return the number of bytes read from this protocol session */
@@ -841,8 +787,13 @@ public class AMQProtocolHandler implements ProtocolEngine
public void setNetworkConnection(NetworkConnection network)
{
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
_network = network;
- _sender = network.getSender();
+ _sender = sender;
}
/** @param delay delay in seconds (not ms) */
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
deleted file mode 100644
index bbd0a7b144..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
- * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
- * too often.
- *
- */
-public class ProtocolBufferMonitorFilter extends IoFilterAdapter
-{
- private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
-
- public static final long DEFAULT_FREQUENCY = 5000;
-
- public static final int DEFAULT_THRESHOLD = 3000;
-
- private int _bufferedMessages = 0;
-
- private int _threshold;
-
- private long _lastMessageOutputTime;
-
- private long _outputFrequencyInMillis;
-
- public ProtocolBufferMonitorFilter()
- {
- _threshold = DEFAULT_THRESHOLD;
- _outputFrequencyInMillis = DEFAULT_FREQUENCY;
- }
-
- public ProtocolBufferMonitorFilter(int threshold, long frequency)
- {
- _threshold = threshold;
- _outputFrequencyInMillis = frequency;
- }
-
- public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages++;
- if (_bufferedMessages > _threshold)
- {
- long now = System.currentTimeMillis();
- if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
- {
- _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
- + _bufferedMessages);
- _lastMessageOutputTime = now;
- }
- }
-
- nextFilter.messageReceived(session, message);
- }
-
- public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages--;
- nextFilter.messageSent(session, message);
- }
-
- public int getBufferedMessages()
- {
- return _bufferedMessages;
- }
-
- public int getThreshold()
- {
- return _threshold;
- }
-
- public void setThreshold(int threshold)
- {
- _threshold = threshold;
- }
-
- public long getOutputFrequencyInMillis()
- {
- return _outputFrequencyInMillis;
- }
-
- public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
- {
- _outputFrequencyInMillis = outputFrequencyInMillis;
- }
-
- public long getLastMessageOutputTime()
- {
- return _lastMessageOutputTime;
- }
-}
diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java b/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java
deleted file mode 100644
index f0938a4bc0..0000000000
--- a/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.DefaultCloseFuture;
-import org.apache.mina.common.support.DefaultWriteFuture;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-import java.net.SocketAddress;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-public class MockIoSession implements IoSession
-{
- private AMQProtocolSession _protocolSession;
-
- /**
- * Stores the last response written
- */
- private Object _lastWrittenObject;
-
- private boolean _closing;
- private IoFilterChain _filterChain;
-
- public MockIoSession()
- {
- _filterChain = new AbstractIoFilterChain(this)
- {
- protected void doWrite(IoSession ioSession, IoFilter.WriteRequest writeRequest) throws Exception
- {
-
- }
-
- protected void doClose(IoSession ioSession) throws Exception
- {
-
- }
- };
- }
-
- public Object getLastWrittenObject()
- {
- return _lastWrittenObject;
- }
-
- public IoService getService()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
- public IoHandler getHandler()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoFilterChain getFilterChain()
- {
- return _filterChain;
- }
-
- public WriteFuture write(Object message)
- {
- WriteFuture wf = new DefaultWriteFuture(null);
- _lastWrittenObject = message;
- return wf;
- }
-
- public CloseFuture close()
- {
- _closing = true;
- CloseFuture cf = new DefaultCloseFuture(null);
- cf.setClosed();
- return cf;
- }
-
- public Object getAttachment()
- {
- return _protocolSession;
- }
-
- public Object setAttachment(Object attachment)
- {
- Object current = _protocolSession;
- _protocolSession = (AMQProtocolSession) attachment;
- return current;
- }
-
- public Object getAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key, Object value)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object removeAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean containsAttribute(String key)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Set getAttributeKeys()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransportType getTransportType()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isConnected()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isClosing()
- {
- return _closing;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getWriteTimeout()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWrittenBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadMessages()
- {
- return 0L;
- }
-
- public long getWrittenMessages()
- {
- return 0L;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIoTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastReadTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastWriteTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-}