diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/common/src/main | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
22 files changed, 458 insertions, 394 deletions
diff --git a/java/common/src/main/java/common.bnd b/java/common/src/main/java/common.bnd index b34f8deacc..84350fdc75 100755 --- a/java/common/src/main/java/common.bnd +++ b/java/common/src/main/java/common.bnd @@ -17,7 +17,7 @@ # under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 5268ce9bc2..7aa280ce02 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -87,6 +87,8 @@ public class ClientProperties public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; + public static final String USE_LEGACY_STREAM_MESSAGE_FORMAT = "qpid.use_legacy_stream_message"; + public static final String AMQP_VERSION = "qpid.amqp.version"; public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id"; @@ -190,6 +192,19 @@ public class ClientProperties */ public static final long DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = 5000L; + /** + * System property to control whether the client will declare queues during + * consumer creation when using BindingURLs. + */ + public static final String QPID_DECLARE_QUEUES_PROP_NAME = "qpid.declare_queues"; + + /** + * System property to control whether the client will declare exchanges during + * producer/consumer creation when using BindingURLs. + */ + public static final String QPID_DECLARE_EXCHANGES_PROP_NAME = "qpid.declare_exchanges"; + public static final String VERIFY_QUEUE_ON_SEND = "qpid.verify_queue_on_send"; + private ClientProperties() { diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index fdc71e31f9..1381390640 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -110,7 +110,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { return new LinkedHashMap<AMQShortString, AMQShortString>() { - + @Override protected boolean removeEldestEntry(Map.Entry<AMQShortString, AMQShortString> eldest) { return size() > LOCAL_INTERN_CACHE_SIZE; @@ -845,22 +845,15 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return internString; } - - public static void main(String args[]) + public static String toString(AMQShortString amqShortString) { - AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k"); - AMQShortString s2 = s.substring(2, 7); - - AMQShortStringTokenizer t = s2.tokenize((byte) '.'); - while(t.hasMoreTokens()) - { - System.err.println(t.nextToken()); - } + return amqShortString == null ? null : amqShortString.asString(); } - public static String toString(AMQShortString amqShortString) + public static void clearLocalCache() { - return amqShortString == null ? null : amqShortString.asString(); + _localInternMap.remove(); + _localStringMap.remove(); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 57f2c638a2..b9ed1b2154 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -855,6 +855,7 @@ public class FieldTable public void addAll(FieldTable fieldTable) { initMapIfNecessary(); + fieldTable.initMapIfNecessary(); if (fieldTable._properties != null) { _encodedForm = null; diff --git a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index 15c144b0eb..59a1b6c5b0 100644 --- a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -49,7 +49,11 @@ public class ConnectionStartProperties public static final String SESSION_FLOW = "qpid.session_flow"; - public static int getPID() + public static int _pid; + + public static final String _platformInfo; + + static { RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); String processName = rtb.getName(); @@ -57,23 +61,20 @@ public class ConnectionStartProperties { try { - return Integer.parseInt(processName.substring(0,processName.indexOf('@'))); + _pid = Integer.parseInt(processName.substring(0,processName.indexOf('@'))); } catch(Exception e) { LOGGER.warn("Unable to get the PID due to error",e); - return -1; + _pid = -1; } } else { LOGGER.warn("Unable to get the PID due to unsupported format : " + processName); - return -1; + _pid = -1; } - } - public static String getPlatformInfo() - { StringBuilder fullSystemInfo = new StringBuilder(System.getProperty("java.runtime.name")); fullSystemInfo.append(", "); fullSystemInfo.append(System.getProperty("java.runtime.version")); @@ -88,6 +89,16 @@ public class ConnectionStartProperties fullSystemInfo.append(", "); fullSystemInfo.append(System.getProperty("sun.os.patch.level")); - return fullSystemInfo.toString(); + _platformInfo = fullSystemInfo.toString(); + } + + public static int getPID() + { + return _pid; + } + + public static String getPlatformInfo() + { + return _platformInfo; } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 7ca588946b..6774d0a45a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -23,6 +23,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -31,7 +32,7 @@ import java.nio.ByteBuffer; * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> +public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -56,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> void readerIdle(); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e87851cf7d..cdca726148 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -21,12 +21,7 @@ package org.apache.qpid.transport; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.*; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; @@ -73,6 +68,8 @@ public class Connection extends ConnectionInvoker //Usable channels are numbered 0 to <ChannelMax> - 1 public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; + private long _lastSendTime; + private long _lastReadTime; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -89,15 +86,15 @@ public class Connection extends ConnectionInvoker public static interface SessionFactory { - Session newSession(Connection conn, Binary name, long expiry); + Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay); } private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) + public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay) { - return new Session(conn, name, expiry); + return new Session(conn, name, expiry, isNoReplay); } } @@ -232,9 +229,10 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver, null); - _remoteAddress = network.getRemoteAddress(); - _localAddress = network.getLocalAddress(); + NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); + + setRemoteAddress(network.getRemoteAddress()); + setLocalAddress(network.getLocalAddress()); final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender()); if(secureSender instanceof ConnectionListener) @@ -298,7 +296,12 @@ public class Connection extends ConnectionInvoker public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), expiry); + return createSession(expiry, false); + } + + public Session createSession(long expiry, boolean isNoReplay) + { + return createSession(UUID.randomUUID().toString(), expiry, isNoReplay); } public Session createSession(String name) @@ -311,6 +314,11 @@ public class Connection extends ConnectionInvoker return createSession(Strings.toUTF8(name), expiry); } + public Session createSession(String name, long expiry,boolean isNoReplay) + { + return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay); + } + public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); @@ -318,6 +326,11 @@ public class Connection extends ConnectionInvoker public Session createSession(Binary name, long expiry) { + return createSession(name, expiry, false); + } + + public Session createSession(Binary name, long expiry, boolean isNoReplay) + { synchronized (lock) { Waiter w = new Waiter(lock, timeout); @@ -331,7 +344,7 @@ public class Connection extends ConnectionInvoker throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - Session ssn = _sessionFactory.newSession(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay); registerSession(ssn); map(ssn); ssn.attach(); @@ -369,6 +382,7 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { + _lastReadTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("RECV: [%s] %s", this, event); @@ -378,6 +392,7 @@ public class Connection extends ConnectionInvoker public void send(ProtocolEvent event) { + _lastSendTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("SEND: [%s] %s", this, event); @@ -728,6 +743,17 @@ public class Connection extends ConnectionInvoker return _localAddress; } + protected void setRemoteAddress(SocketAddress remoteAddress) + { + _remoteAddress = remoteAddress; + } + + protected void setLocalAddress(SocketAddress localAddress) + { + _localAddress = localAddress; + } + + private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode) { SessionDetached sessionDetached = new SessionDetached(); @@ -735,4 +761,38 @@ public class Connection extends ConnectionInvoker sessionDetached.setCode(sessionDetachCode); invoke(sessionDetached); } + + + protected void doHeartBeat() + { + connectionHeartbeat(); + } + + private class ConnectionActivity implements TransportActivity + { + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastSendTime; + } + + @Override + public void writerIdle() + { + connectionHeartbeat(); + } + + @Override + public void readerIdle() + { + // TODO + + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java index 20d6f98fa6..12f8d801dc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java +++ b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java @@ -38,14 +38,6 @@ public interface NetworkTransportConfiguration // The amount of memory in bytes to allocate to the outgoing buffer Integer getSendBufferSize(); - Integer getPort(); - - String getHost(); - - String getTransport(); - - Integer getConnectorProcessors(); - InetSocketAddress getAddress(); boolean needClientAuth(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index e9a7d51456..1e0d5b9698 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -71,7 +71,8 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { - tuneAuthorizedConnection(conn); + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, + "No Sasl mechanism was specified"); return; } @@ -82,7 +83,7 @@ public class ServerDelegate extends ConnectionDelegate if (ss == null) { conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, - "null SASL mechanism: " + mechanism); + "No SaslServer could be created for mechanism: " + mechanism); return; } conn.setSaslServer(ss); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 95c3e4669f..8b29d6e424 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -25,7 +25,6 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; - import static org.apache.qpid.transport.Option.COMPLETED; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.TIMELY_REPLY; @@ -132,19 +131,31 @@ public class Session extends SessionInvoker private final Object stateLock = new Object(); private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + private boolean _isNoReplay = false; protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); } + protected Session(Connection connection, Binary name, long expiry, boolean noReplay) + { + this(connection, new SessionDelegate(), name, expiry, noReplay); + } + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { + this(connection, delegate, name, expiry,false); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay) + { this.connection = connection; this.delegate = delegate; this.name = name; this.expiry = expiry; this.closing = false; + this._isNoReplay = noReplay; initReceiver(); } @@ -282,6 +293,7 @@ public class Session extends SessionInvoker void resume() { _failoverRequired.set(false); + synchronized (commandsLock) { attach(); @@ -414,7 +426,7 @@ public class Session extends SessionInvoker if(log.isDebugEnabled()) { - log.debug("ID: [%s] %s", this.channel, id); + log.debug("identify: ch=%s, commandId=%s", this.channel, id); } if ((id & 0xff) == 0) @@ -443,7 +455,7 @@ public class Session extends SessionInvoker { if(log.isDebugEnabled()) { - log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed); + log.debug("%s ch=%s processed([%d,%d]) %s %s", this, channel, lower, upper, syncPoint, maxProcessed); } boolean flush; @@ -451,7 +463,7 @@ public class Session extends SessionInvoker { if(log.isDebugEnabled()) { - log.debug("%s", processed); + log.debug("%s processed: %s", this, processed); } if (ge(upper, commandsIn)) @@ -740,7 +752,7 @@ public class Session extends SessionInvoker sessionCommandPoint(0, 0); } - boolean replayTransfer = !closing && !transacted && + boolean replayTransfer = !_isNoReplay && !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index 4d4274278f..8437ef1a94 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext; public interface IncomingNetworkTransport extends NetworkTransport { - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext); + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext); }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 12c42d6643..050d194c47 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -50,4 +50,8 @@ public interface NetworkConnection void setPeerPrincipal(Principal principal); Principal getPeerPrincipal(); + + int getMaxReadIdle(); + + int getMaxWriteIdle(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 854d76430b..45231aa05d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -23,12 +23,13 @@ package org.apache.qpid.transport.network; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; -import javax.net.ssl.SSLContext; import java.nio.ByteBuffer; public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext); + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity); }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java b/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java new file mode 100644 index 0000000000..210b014a57 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface Ticker +{ + int getTimeToNextTick(long currentTime); + + int tick(long currentTime); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java b/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java new file mode 100644 index 0000000000..2ee336d9b2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface TransportActivity +{ + long getLastReadTime(); + + long getLastWriteTime(); + + void writerIdle(); + + void readerIdle(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java new file mode 100644 index 0000000000..54a2a360bb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportActivity; + +class IdleTimeoutTicker implements Ticker +{ + private final TransportActivity _transport; + private final int _defaultTimeout; + private NetworkConnection _connection; + + public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout) + { + _transport = transport; + _defaultTimeout = defaultTimeout; + } + + @Override + public int getTimeToNextTick(long currentTime) + { + long nextTime = -1; + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + + if(maxReadIdle > 0) + { + nextTime = _transport.getLastReadTime() + maxReadIdle; + } + + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + + if(maxWriteIdle > 0) + { + long writeTime = _transport.getLastWriteTime() + maxWriteIdle; + if(nextTime == -1l || writeTime < nextTime) + { + nextTime = writeTime; + } + } + return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime); + } + + @Override + public int tick(long currentTime) + { + // writer Idle + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) + { + _transport.writerIdle(); + } + // reader Idle + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) + { + + _transport.readerIdle(); + } + return getTimeToNextTick(currentTime); + } + + public void setConnection(NetworkConnection connection) + { + _connection = connection; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 2658296c5f..f5c09ac2cc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.security.Principal; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection private final IoSender _ioSender; private final IoReceiver _ioReceiver; private Principal _principal; + private int _maxReadIdle; + private int _maxWriteIdle; public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, - int sendBufferSize, int receiveBufferSize, long timeout) + int sendBufferSize, int receiveBufferSize, long timeout) + { + this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null); + } + + public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; _timeout = timeout; _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioReceiver.setTicker(ticker); _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); @@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection public void setMaxWriteIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxWriteIdle = sec; } public void setMaxReadIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxReadIdle = sec; } @Override @@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection { return _principal; } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 9b6f0a0b1b..c8027e143e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.*; + import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport @@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private IoNetworkConnection _connection; private AcceptingThread _acceptor; - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext) + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); int receiveBufferSize = settings.getReadBufferSize(); @@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT); + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); _connection.start(); } catch(Exception e) @@ -128,9 +131,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet return _connection; } - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext) + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) { - try { _acceptor = new AcceptingThread(config, factory, sslContext); @@ -141,8 +145,6 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { throw new TransportException("Unable to start server socket", e); } - - } private class AcceptingThread extends Thread @@ -152,15 +154,16 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private ProtocolEngineFactory _factory; private SSLContext _sslContext; private ServerSocket _serverSocket; + private int _timeout; private AcceptingThread(NetworkTransportConfiguration config, ProtocolEngineFactory factory, - SSLContext sslContext) - throws IOException + SSLContext sslContext) throws IOException { _config = config; _factory = factory; _sslContext = sslContext; + _timeout = TIMEOUT; InetSocketAddress address = config.getAddress(); @@ -172,15 +175,19 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); _serverSocket = socketFactory.createServerSocket(); - ((SSLServerSocket)_serverSocket).setNeedClientAuth(config.needClientAuth()); - ((SSLServerSocket)_serverSocket).setWantClientAuth(config.wantClientAuth()); + if(config.needClientAuth()) + { + ((SSLServerSocket)_serverSocket).setNeedClientAuth(true); + } + else if(config.wantClientAuth()) + { + ((SSLServerSocket)_serverSocket).setWantClientAuth(true); + } } _serverSocket.setReuseAddress(true); _serverSocket.bind(address); - - } @@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(_timeout); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -224,10 +232,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet socket.setSendBufferSize(sendBufferSize); socket.setReceiveBufferSize(receiveBufferSize); - ProtocolEngine engine = _factory.newProtocolEngine(); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT); + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, + ticker); + ticker.setConnection(connection); if(_sslContext != null) { @@ -248,14 +258,14 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } catch(RuntimeException e) { - LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); closeSocketIfNecessary(socket); } catch(IOException e) { if(!_closed) { - LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); closeSocketIfNecessary(socket); try { @@ -275,7 +285,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { if(LOGGER.isDebugEnabled()) { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort()); + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress()); } } } @@ -294,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } } } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 7e63071c16..06a43e21c6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLSocket; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; private static final boolean shutdownBroken; + + private Ticker _ticker; static { String osName = System.getProperty("os.name"); @@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable { final int threshold = bufferSize / 2; - // I set the read buffer size simillar to SO_RCVBUF + // I set the read buffer size similar to SO_RCVBUF // Haven't tested with a lower value to see if it's better or worse byte[] buffer = new byte[bufferSize]; try @@ -144,27 +148,71 @@ final class IoReceiver implements Runnable, Closeable InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + long currentTime; + while(read != -1) { - if (read > 0) + try + { + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + try + { + if(!socket.isClosed()) + { + socket.setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) + currentTime = System.currentTimeMillis(); + if(_ticker != null) { - offset = 0; - buffer = new byte[bufferSize]; + final int tick = _ticker.tick(currentTime); + if(!socket.isClosed()) + { + try + { + socket.setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } } } } } catch (Throwable t) { - if (!(shutdownBroken && - t instanceof SocketException && - t.getMessage().equalsIgnoreCase("socket closed") && - closed.get())) + if (shouldReport(t)) { receiver.exception(t); } @@ -183,4 +231,30 @@ final class IoReceiver implements Runnable, Closeable } } + private boolean shouldReport(Throwable t) + { + boolean brokenClose = closed.get() && + shutdownBroken && + t instanceof SocketException && + "socket closed".equalsIgnoreCase(t.getMessage()); + + boolean sslSocketClosed = closed.get() && + socket instanceof SSLSocket && + t instanceof SocketException && + "Socket is closed".equalsIgnoreCase(t.getMessage()); + + return !brokenClose && !sslSocketClosed; + } + + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 0e6c865a16..61585443b1 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -36,6 +36,9 @@ public interface BindingURL public static final String OPTION_SUBSCRIPTION = "subscription"; public static final String OPTION_ROUTING_KEY = "routingkey"; public static final String OPTION_BINDING_KEY = "bindingkey"; + public static final String OPTION_EXCHANGE_AUTODELETE = "exchangeautodelete"; + public static final String OPTION_EXCHANGE_DURABLE = "exchangedurable"; + public static final String OPTION_EXCHANGE_INTERNAL = "exchangeinternal"; /** * This option is only applicable for 0-8/0-9/0-9-1 protocols connection diff --git a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index 2d3e321812..7362099070 100644 --- a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -220,6 +220,19 @@ public class FileUtils public static void copyCheckedEx(File src, File dst) throws IOException { InputStream in = new FileInputStream(src); + copy(in, dst); + } + + /** + * Copies the specified InputStream to the specified destination file. If the destination file does not exist, + * it is created. + * + * @param in The InputStream + * @param dst The destination file name. + * @throws IOException + */ + public static void copy(InputStream in, File dst) throws IOException + { try { if (!dst.exists()) diff --git a/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java b/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java deleted file mode 100644 index 971dd3fe2a..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java +++ /dev/null @@ -1,300 +0,0 @@ -/*********************************************************************** - * Copyright (c) 2000-2006 The Apache Software Foundation. * - * All rights reserved. * - * ------------------------------------------------------------------- * - * Licensed under the Apache License, Version 2.0 (the "License"); you * - * may not use this file except in compliance with the License. You * - * may obtain a copy of the License at: * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, software * - * distributed under the License is distributed on an "AS IS" BASIS, * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * - * implied. See the License for the specific language governing * - * permissions and limitations under the License. * - ***********************************************************************/ - -package org.apache.qpid.util; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; - -public class NetMatcher -{ - private ArrayList networks; - - public void initInetNetworks(final Collection nets) - { - networks = new ArrayList(); - for (Iterator iter = nets.iterator(); iter.hasNext(); ) - { - try - { - InetNetwork net = InetNetwork.getFromString((String) iter.next()); - if (!networks.contains(net)) - { - networks.add(net); - } - } - catch (java.net.UnknownHostException uhe) - { - log("Cannot resolve address: " + uhe.getMessage()); - } - } - networks.trimToSize(); - } - - public void initInetNetworks(final String[] nets) - { - networks = new ArrayList(); - for (int i = 0; i < nets.length; i++) - { - try - { - InetNetwork net = InetNetwork.getFromString(nets[i]); - if (!networks.contains(net)) - { - networks.add(net); - } - } - catch (java.net.UnknownHostException uhe) - { - log("Cannot resolve address: " + uhe.getMessage()); - } - } - networks.trimToSize(); - } - - public boolean matchInetNetwork(final String hostIP) - { - InetAddress ip = null; - - try - { - ip = InetAddress.getByName(hostIP); - } - catch (java.net.UnknownHostException uhe) - { - log("Cannot resolve address for " + hostIP + ": " + uhe.getMessage()); - } - - boolean sameNet = false; - - if (ip != null) - { - for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) - { - InetNetwork network = (InetNetwork) iter.next(); - sameNet = network.contains(ip); - } - } - return sameNet; - } - - public boolean matchInetNetwork(final InetAddress ip) - { - boolean sameNet = false; - - for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) - { - InetNetwork network = (InetNetwork) iter.next(); - sameNet = network.contains(ip); - } - return sameNet; - } - - public NetMatcher() - { - } - - public NetMatcher(final String[] nets) - { - initInetNetworks(nets); - } - - public NetMatcher(final Collection nets) - { - initInetNetworks(nets); - } - - public String toString() { - return networks.toString(); - } - - protected void log(String s) { } -} - -class InetNetwork -{ - /* - * Implements network masking, and is compatible with RFC 1518 and - * RFC 1519, which describe CIDR: Classless Inter-Domain Routing. - */ - - private InetAddress network; - private InetAddress netmask; - - public InetNetwork(InetAddress ip, InetAddress netmask) - { - network = maskIP(ip, netmask); - this.netmask = netmask; - } - - public boolean contains(final String name) throws java.net.UnknownHostException - { - return network.equals(maskIP(InetAddress.getByName(name), netmask)); - } - - public boolean contains(final InetAddress ip) - { - return network.equals(maskIP(ip, netmask)); - } - - public String toString() - { - return network.getHostAddress() + "/" + netmask.getHostAddress(); - } - - public int hashCode() - { - return maskIP(network, netmask).hashCode(); - } - - public boolean equals(Object obj) - { - return (obj != null) && (obj instanceof InetNetwork) && - ((((InetNetwork)obj).network.equals(network)) && (((InetNetwork)obj).netmask.equals(netmask))); - } - - public static InetNetwork getFromString(String netspec) throws java.net.UnknownHostException - { - if (netspec.endsWith("*")) - { - netspec = normalizeFromAsterisk(netspec); - } - else - { - int iSlash = netspec.indexOf('/'); - if (iSlash == -1) - { - netspec += "/255.255.255.255"; - } - else if (netspec.indexOf('.', iSlash) == -1) - { - netspec = normalizeFromCIDR(netspec); - } - } - - return new InetNetwork(InetAddress.getByName(netspec.substring(0, netspec.indexOf('/'))), - InetAddress.getByName(netspec.substring(netspec.indexOf('/') + 1))); - } - - public static InetAddress maskIP(final byte[] ip, final byte[] mask) - { - try - { - return getByAddress(new byte[] - { - (byte) (mask[0] & ip[0]), - (byte) (mask[1] & ip[1]), - (byte) (mask[2] & ip[2]), - (byte) (mask[3] & ip[3]) - }); - } - catch(Exception _) {} - { - return null; - } - } - - public static InetAddress maskIP(final InetAddress ip, final InetAddress mask) - { - return maskIP(ip.getAddress(), mask.getAddress()); - } - - /* - * This converts from an uncommon "wildcard" CIDR format - * to "address + mask" format: - * - * * => 000.000.000.0/000.000.000.0 - * xxx.* => xxx.000.000.0/255.000.000.0 - * xxx.xxx.* => xxx.xxx.000.0/255.255.000.0 - * xxx.xxx.xxx.* => xxx.xxx.xxx.0/255.255.255.0 - */ - static private String normalizeFromAsterisk(final String netspec) - { - String[] masks = { "0.0.0.0/0.0.0.0", "0.0.0/255.0.0.0", "0.0/255.255.0.0", "0/255.255.255.0" }; - char[] srcb = netspec.toCharArray(); - int octets = 0; - for (int i = 1; i < netspec.length(); i++) - { - if (srcb[i] == '.') - { - octets++; - } - } - return (octets == 0) ? masks[0] : netspec.substring(0, netspec.length() -1 ).concat(masks[octets]); - } - - /* - * RFC 1518, 1519 - Classless Inter-Domain Routing (CIDR) - * This converts from "prefix + prefix-length" format to - * "address + mask" format, e.g. from xxx.xxx.xxx.xxx/yy - * to xxx.xxx.xxx.xxx/yyy.yyy.yyy.yyy. - */ - static private String normalizeFromCIDR(final String netspec) - { - final int bits = 32 - Integer.parseInt(netspec.substring(netspec.indexOf('/')+1)); - final int mask = (bits == 32) ? 0 : 0xFFFFFFFF - ((1 << bits)-1); - - return netspec.substring(0, netspec.indexOf('/') + 1) + - Integer.toString(mask >> 24 & 0xFF, 10) + "." + - Integer.toString(mask >> 16 & 0xFF, 10) + "." + - Integer.toString(mask >> 8 & 0xFF, 10) + "." + - Integer.toString(mask >> 0 & 0xFF, 10); - } - - private static java.lang.reflect.Method getByAddress = null; - - static { - try { - Class inetAddressClass = Class.forName("java.net.InetAddress"); - Class[] parameterTypes = { byte[].class }; - getByAddress = inetAddressClass.getMethod("getByAddress", parameterTypes); - } catch (Exception e) { - getByAddress = null; - } - } - - private static InetAddress getByAddress(byte[] ip) throws java.net.UnknownHostException - { - InetAddress addr = null; - if (getByAddress != null) - { - try - { - addr = (InetAddress) getByAddress.invoke(null, new Object[] { ip }); - } - catch (IllegalAccessException e) - { - } - catch (java.lang.reflect.InvocationTargetException e) - { - } - } - - if (addr == null) { - addr = InetAddress.getByName - ( - Integer.toString(ip[0] & 0xFF, 10) + "." + - Integer.toString(ip[1] & 0xFF, 10) + "." + - Integer.toString(ip[2] & 0xFF, 10) + "." + - Integer.toString(ip[3] & 0xFF, 10) - ); - } - return addr; - } -} |