diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
| commit | 825327492ededcf62f7307a96eb29f5e7df88351 (patch) | |
| tree | 05ce4cad575ce2c0379c5440f5ec197629c97c16 /java/common/src/main | |
| parent | 55dae2c49ba8c283583f3688784f2b763e772020 (diff) | |
| download | qpid-python-825327492ededcf62f7307a96eb29f5e7df88351.tar.gz | |
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover
testing.
- Modified QpidTestCase to substitute port variables into broker
start/stop commands.
- Modified test profiles to use the new port variables.
- Modified QpidTestCase to permit multiple exclude files.
- Modified test profiles to make use of a common exclude list:
ExcludeList
- Added ConnectionTest.testResumeEmptyReplayBuffer.
- Made default exception handling for Connection and Session log the
exception.
- Added SenderExcetion to specifically signal problems with
transmitting connection data.
- Modified Session to catch and deal with connection send failures
for sessions with positive expiry.
- Modified FailoverBaseCase to work for non VM brokers.
- Made FailoverTest fail if failover times out.
- Modified JMS implementation to make use of the recently added low
level session resume.
- Unexcluded failover tests from 0-10 test profiles.
- Excluded MultipleJCAProviderRegistrationTest due to its testing
strategy resulting in spurious failure when running as part of the
larger test suite.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
4 files changed, 138 insertions, 21 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7a66c2c238..cf9b9145a9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -62,7 +62,7 @@ public class Connection extends ConnectionInvoker public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exception) { - throw exception; + log.error(exception, "connection exception"); } public void closed(Connection conn) {} } @@ -155,7 +155,12 @@ public class Connection extends ConnectionInvoker return saslClient; } - public void connect(String host, int port, String vhost, String username, String password,boolean ssl) + public void connect(String host, int port, String vhost, String username, String password) + { + connect(host, port, vhost, username, password, false); + } + + public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { synchronized (lock) { @@ -163,7 +168,7 @@ public class Connection extends ConnectionInvoker delegate = new ClientDelegate(vhost, username, password); - IoTransport.connect(host, port, ConnectionBinding.get(this),ssl); + IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -371,12 +376,11 @@ public class Connection extends ConnectionInvoker case CLOSING: error = e; lock.notifyAll(); - break; - default: - listener.exception(this, e); - break; + return; } } + + listener.exception(this, e); } public void exception(Throwable t) @@ -402,13 +406,13 @@ public class Connection extends ConnectionInvoker public void closed() { - log.debug("connection closed: %s", this); - if (state == OPEN) { exception(new ConnectionException("connection aborted")); } + log.debug("connection closed: %s", this); + synchronized (lock) { List<Session> values = new ArrayList<Session>(channels.values()); diff --git a/java/common/src/main/java/org/apache/qpid/transport/SenderException.java b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java new file mode 100644 index 0000000000..a96079dc27 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SenderException + * + */ + +public class SenderException extends TransportException +{ + + public SenderException(String message, Throwable cause) + { + super(message, cause); + } + + public SenderException(String message) + { + super(message); + } + + public SenderException(Throwable cause) + { + super(cause); + } + + public void rethrow() + { + throw new SenderException(getMessage(), this); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index bab4bb35ee..8877b7b683 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -65,7 +65,7 @@ public class Session extends SessionInvoker public void exception(Session ssn, SessionException exc) { - throw exc; + log.error(exc, "session exception"); } public void closed(Session ssn) {} @@ -195,6 +195,9 @@ public class Session extends SessionInvoker send(m); } } + + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); } } @@ -219,6 +222,7 @@ public class Session extends SessionInvoker this.commandsIn = id; if (!incomingInit) { + incomingInit = true; maxProcessed = commandsIn - 1; syncPoint = maxProcessed; } @@ -242,6 +246,11 @@ public class Session extends SessionInvoker final void identify(Method cmd) { + if (!incomingInit) + { + throw new IllegalStateException(); + } + int id = nextCommandId(); cmd.setId(id); @@ -417,8 +426,8 @@ public class Session extends SessionInvoker default: throw new SessionException (String.format - ("timed out waiting for session to become open %s", - state)); + ("timed out waiting for session to become open " + + "(state=%s)", state)); } int next = commandsOut++; @@ -429,7 +438,26 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && isFull(next)) { - sessionFlush(COMPLETED); + if (state == OPEN) + { + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will + // happen again on resume + log.error(e, "error sending flush (full replay buffer)"); + } + else + { + e.rethrow(); + } + } + } w.await(); } } @@ -452,7 +480,23 @@ public class Session extends SessionInvoker m.setSync(true); } needSync = !m.isSync(); - send(m); + try + { + send(m); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending command"); + } + else + { + e.rethrow(); + } + } if (autoSync) { sync(); @@ -462,7 +506,23 @@ public class Session extends SessionInvoker // wraparound if ((next % 65536) == 0) { - sessionFlush(COMPLETED); + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending flush (periodic)"); + } + else + { + e.rethrow(); + } + } } } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 73ff039be5..36ea14856a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -92,7 +93,7 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> { if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } final int size = buffer.length; @@ -125,12 +126,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } if (head - tail >= size) { - throw new TransportException(String.format("write timed out: %s, %s", head, tail)); + throw new SenderException(String.format("write timed out: %s, %s", head, tail)); } } continue; @@ -192,19 +193,19 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> join(timeout); if (isAlive()) { - throw new TransportException("join timed out"); + throw new SenderException("join timed out"); } } transport.getReceiver().close(false); } catch (InterruptedException e) { - throw new TransportException(e); + throw new SenderException(e); } if (reportException && exception != null) { - throw new TransportException(exception); + throw new SenderException(exception); } } } |
