diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-13 16:07:01 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-13 16:07:01 +0000 |
| commit | 9ebd8a5bd4dd60e5983cfa998fea1c78fdb87401 (patch) | |
| tree | 6cbde7bc931ccfc726a27cd5b1573c43dea5fc8e /qpid/java/common/src | |
| parent | ea476363bc00e42de3b1e39f2c6d4a2ed6cb677f (diff) | |
| download | qpid-python-9ebd8a5bd4dd60e5983cfa998fea1c78fdb87401.tar.gz | |
QPID-1339:
- Removed the Channel class in order to simplify the state management
surrounding Sessions and Connections.
- Consolidated the ChannelDelegate into the ConnectionDelegate.
- Modified MethodDelegate to invoke a generic handle method as the
default action for each dispatched method.
- Modified the code generator to produce a separate ConnectionInvoker
and SessionInvoker.
- Modified the invoker template to use package level visibility for
all controls rather than public visibility.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704147 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
16 files changed, 304 insertions, 385 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java index 79bb286d76..8163210f14 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -33,48 +33,28 @@ import org.apache.qpid.transport.network.mina.MinaHandler; * @author Rafael H. Schloming */ -class ToyClient extends SessionDelegate +class ToyClient implements SessionListener { + public void opened(Session ssn) {} - @Override public void messageReject(Session ssn, MessageReject reject) + public void exception(Session ssn, SessionException exc) { - for (Range range : reject.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - ssn.getCommand((int) l)); - } - } + exc.printStackTrace(); } - @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + public void message(Session ssn, MessageTransfer xfr) { System.out.println("msg: " + xfr); } + public void closed(Session ssn) {} + public static final void main(String[] args) { - Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ClientDelegate(null, "guest", "guest") - { - public SessionDelegate getSessionDelegate() - { - return new ToyClient(); - } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} - }); - conn.send(new ProtocolHeader - (1, 0, 10)); - - Channel ch = conn.getChannel(0); - Session ssn = new Session("my-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + Connection conn = new Connection(); + conn.connect("0.0.0.0", 5672, null, "guest", "guest"); + Session ssn = conn.createSession(); + ssn.setSessionListener(new ToyClient()); ssn.queueDeclare("asdf", null, null); ssn.sync(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java index e6dedc536f..4e97855a6f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.transport; +import java.nio.ByteBuffer; + +import static org.apache.qpid.transport.util.Functions.*; + /** * Binary @@ -51,6 +55,13 @@ public final class Binary this(bytes, 0, bytes.length); } + public final byte[] getBytes() + { + byte[] result = new byte[size]; + System.arraycopy(bytes, offset, result, 0, size); + return result; + } + public final byte[] array() { return bytes; @@ -126,4 +137,9 @@ public final class Binary return true; } + public String toString() + { + return str(ByteBuffer.wrap(bytes, offset, size)); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java deleted file mode 100644 index d973739ed6..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import org.apache.qpid.transport.network.Frame; -import org.apache.qpid.transport.util.Logger; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.qpid.transport.network.Frame.*; -import static org.apache.qpid.transport.util.Functions.*; - - -/** - * Channel - * - * @author Rafael H. Schloming - */ - -public class Channel extends Invoker - implements Receiver<ProtocolEvent>, ProtocolDelegate<Void> -{ - - private static final Logger log = Logger.get(Channel.class); - - final private Connection connection; - final private int channel; - final private MethodDelegate<Channel> delegate; - final private SessionDelegate sessionDelegate; - // session may be null - private Session session; - - public Channel(Connection connection, int channel, SessionDelegate delegate) - { - this.connection = connection; - this.channel = channel; - this.delegate = new ChannelDelegate(); - this.sessionDelegate = delegate; - } - - public Connection getConnection() - { - return connection; - } - - public void received(ProtocolEvent event) - { - event.delegate(null, this); - } - - public void init(Void v, ProtocolHeader hdr) - { - connection.getConnectionDelegate().init(this, hdr); - } - - public void control(Void v, Method method) - { - switch (method.getEncodedTrack()) - { - case L1: - method.dispatch(this, connection.getConnectionDelegate()); - break; - case L2: - method.dispatch(this, delegate); - break; - case L3: - method.delegate(session, sessionDelegate); - break; - default: - throw new IllegalStateException - ("unknown track: " + method.getEncodedTrack()); - } - } - - public void command(Void v, Method method) - { - method.delegate(session, sessionDelegate); - } - - public void error(Void v, ProtocolError error) - { - throw new RuntimeException(error.getMessage()); - } - - public void exception(Throwable t) - { - session.exception(t); - } - - public void closed() - { - log.debug("channel closed: %s", this); - if (session != null) - { - session.closed(); - } - connection.removeChannel(channel); - } - - public int getEncodedChannel() { - return channel; - } - - public Session getSession() - { - return session; - } - - void setSession(Session session) - { - this.session = session; - } - - void closeCode(ConnectionClose close) - { - if (session != null) - { - session.closeCode(close); - } - } - - private void emit(ProtocolEvent event) - { - event.setChannel(channel); - connection.send(event); - } - - public void method(Method m) - { - emit(m); - - if (!m.isBatch()) - { - connection.flush(); - } - } - - protected void invoke(Method m) - { - method(m); - } - - protected <T> Future<T> invoke(Method m, Class<T> cls) - { - throw new UnsupportedOperationException(); - } - - public String toString() - { - return String.format("%s:%s", connection, channel); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java deleted file mode 100644 index 8475fbf174..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.util.UUID; - - -/** - * ChannelDelegate - * - * @author Rafael H. Schloming - */ - -class ChannelDelegate extends MethodDelegate<Channel> -{ - - public @Override void sessionAttach(Channel channel, SessionAttach atch) - { - Session ssn = new Session(atch.getName()); - ssn.attach(channel); - ssn.sessionAttached(ssn.getName()); - } - - public @Override void sessionDetached(Channel channel, SessionDetached closed) - { - channel.closed(); - } - - public @Override void sessionDetach(Channel channel, SessionDetach dtc) - { - channel.getSession().closed(); - channel.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL); - channel.closed(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index d782170aa5..2604f6970c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -62,23 +62,20 @@ public class ClientDelegate extends ConnectionDelegate this.password = password; } - public void init(Channel ch, ProtocolHeader hdr) + public void init(Connection conn, ProtocolHeader hdr) { if (!(hdr.getMajor() == 0 && hdr.getMinor() == 10)) { - Connection conn = ch.getConnection(); conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor())); } - } - @Override public void connectionStart(Channel ch, ConnectionStart start) + @Override public void connectionStart(Connection conn, ConnectionStart start) { - Connection conn = ch.getConnection(); List<Object> mechanisms = start.getMechanisms(); if (mechanisms == null || mechanisms.isEmpty()) { - ch.connectionStartOk + conn.connectionStartOk (Collections.EMPTY_MAP, null, null, conn.getLocale()); return; } @@ -97,7 +94,7 @@ public class ClientDelegate extends ConnectionDelegate byte[] response = sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null; - ch.connectionStartOk + conn.connectionStartOk (Collections.EMPTY_MAP, sc.getMechanismName(), response, conn.getLocale()); } @@ -107,14 +104,13 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionSecure(Channel ch, ConnectionSecure secure) + @Override public void connectionSecure(Connection conn, ConnectionSecure secure) { - Connection conn = ch.getConnection(); SaslClient sc = conn.getSaslClient(); try { byte[] response = sc.evaluateChallenge(secure.getChallenge()); - ch.connectionSecureOk(response); + conn.connectionSecureOk(response); } catch (SaslException e) { @@ -122,20 +118,19 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionTune(Channel ch, ConnectionTune tune) + @Override public void connectionTune(Connection conn, ConnectionTune tune) { - Connection conn = ch.getConnection(); conn.setChannelMax(tune.getChannelMax()); - ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); - ch.connectionOpen(vhost, null, Option.INSIST); + conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + conn.connectionOpen(vhost, null, Option.INSIST); } - @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok) + @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) { - ch.getConnection().setState(OPEN); + conn.setState(OPEN); } - @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir) + @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) { throw new UnsupportedOperationException(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 2c927e21fd..71027e3256 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -24,6 +24,7 @@ import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; +import org.apache.qpid.util.Strings; import java.util.ArrayList; import java.util.HashMap; @@ -48,7 +49,7 @@ import static org.apache.qpid.transport.Connection.State.*; * short instead of Short */ -public class Connection +public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { @@ -69,7 +70,8 @@ public class Connection private ConnectionDelegate delegate; private Sender<ProtocolEvent> sender; - final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + final private Map<Binary,Session> sessions = new HashMap<Binary,Session>(); + final private Map<Integer,Session> channels = new HashMap<Integer,Session>(); private State state = NEW; private Object lock = new Object(); @@ -200,14 +202,45 @@ public class Connection return createSession(0); } - public Session createSession(long expiryInSeconds) + public Session createSession(long timeout) { - Channel ch = getChannel(); - Session ssn = new Session(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; + return createSession(UUID.randomUUID().toString(), timeout); + } + + public Session createSession(String name) + { + return createSession(name, 0); + } + + public Session createSession(String name, long timeout) + { + return createSession(Strings.toUTF8(name), timeout); + } + + public Session createSession(byte[] name, long timeout) + { + return createSession(new Binary(name), timeout); + } + + public Session createSession(Binary name, long timeout) + { + synchronized (lock) + { + Session ssn = new Session(this, name); + sessions.put(name, ssn); + map(ssn); + ssn.sessionAttach(name.getBytes()); + ssn.sessionRequestTimeout(timeout); + return ssn; + } + } + + void removeSession(Session ssn) + { + synchronized (lock) + { + sessions.remove(ssn.getName()); + } } public void setConnectionId(int id) @@ -228,8 +261,7 @@ public class Connection public void received(ProtocolEvent event) { log.debug("RECV: [%s] %s", this, event); - Channel channel = getChannel(event.getChannel()); - channel.received(event); + event.delegate(this, delegate); } public void send(ProtocolEvent event) @@ -249,6 +281,22 @@ public class Connection sender.flush(); } + protected void invoke(Method method) + { + method.setChannel(0); + send(method); + if (!method.isBatch()) + { + flush(); + } + } + + public void dispatch(Method method) + { + Session ssn = getSession(method.getChannel()); + ssn.received(method); + } + public int getChannelMax() { return channelMax; @@ -259,7 +307,7 @@ public class Connection channelMax = max; } - public Channel getChannel() + private int map(Session ssn) { synchronized (lock) { @@ -267,7 +315,8 @@ public class Connection { if (!channels.containsKey(i)) { - return getChannel(i); + map(ssn, i); + return i; } } @@ -275,25 +324,28 @@ public class Connection } } - public Channel getChannel(int number) + void map(Session ssn, int channel) { synchronized (lock) { - Channel channel = channels.get(number); - if (channel == null) - { - channel = new Channel(this, number, delegate.getSessionDelegate()); - channels.put(number, channel); - } - return channel; + channels.put(channel, ssn); + ssn.setChannel(channel); + } + } + + void unmap(Session ssn) + { + synchronized (lock) + { + channels.remove(ssn.getChannel()); } } - void removeChannel(int number) + Session getSession(int channel) { synchronized (lock) { - channels.remove(number); + return channels.get(channel); } } @@ -324,9 +376,9 @@ public class Connection { synchronized (lock) { - for (Channel ch : channels.values()) + for (Session ssn : channels.values()) { - ch.closeCode(close); + ssn.closeCode(close); } ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) @@ -347,10 +399,10 @@ public class Connection synchronized (lock) { - List<Channel> values = new ArrayList<Channel>(channels.values()); - for (Channel ch : values) + List<Session> values = new ArrayList<Session>(channels.values()); + for (Session ssn : values) { - ch.closed(); + ssn.closed(); } sender = null; @@ -367,9 +419,8 @@ public class Connection switch (state) { case OPEN: - Channel ch = getChannel(0); state = CLOSING; - ch.connectionClose(ConnectionCloseCode.NORMAL, null); + connectionClose(ConnectionCloseCode.NORMAL, null); Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state == CLOSING && error == null) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 9056a3120b..06f80be6dd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -37,31 +37,64 @@ import static org.apache.qpid.transport.Connection.State.*; * * the connectionClose is kind of different for both sides */ -public abstract class ConnectionDelegate extends MethodDelegate<Channel> +public abstract class ConnectionDelegate + extends MethodDelegate<Connection> + implements ProtocolDelegate<Connection> { private static final Logger log = Logger.get(ConnectionDelegate.class); - public SessionDelegate getSessionDelegate() + public void control(Connection conn, Method method) { - return new SessionDelegate(); + method.dispatch(conn, this); } - public abstract void init(Channel ch, ProtocolHeader hdr); + public void command(Connection conn, Method method) + { + method.dispatch(conn, this); + } + + public void error(Connection conn, ProtocolError error) + { + conn.exception(new ConnectionException(error.getMessage())); + } - @Override public void connectionClose(Channel ch, ConnectionClose close) + public void handle(Connection conn, Method method) { - Connection conn = ch.getConnection(); - ch.connectionCloseOk(); + conn.dispatch(method); + } + + @Override public void connectionClose(Connection conn, ConnectionClose close) + { + conn.connectionCloseOk(); conn.getSender().close(); conn.closeCode(close); conn.setState(CLOSE_RCVD); } - @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok) + @Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok) { - Connection conn = ch.getConnection(); conn.getSender().close(); } + @Override public void sessionAttached(Connection conn, SessionAttached atc) + { + + } + + @Override public void sessionDetach(Connection conn, SessionDetach dtc) + { + Session ssn = conn.getSession(dtc.getChannel()); + conn.unmap(ssn); + ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL); + ssn.closed(); + } + + @Override public void sessionDetached(Connection conn, SessionDetached dtc) + { + Session ssn = conn.getSession(dtc.getChannel()); + conn.unmap(ssn); + ssn.closed(); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java index 89b59c2512..dcf05d9f72 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -32,22 +32,33 @@ import org.apache.qpid.transport.network.io.IoAcceptor; * */ -public class Echo extends SessionDelegate +public class Echo implements SessionListener { - public void messageTransfer(Session ssn, MessageTransfer xfr) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { ssn.invoke(xfr); ssn.processed(xfr); } + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + public static final void main(String[] args) throws IOException { ConnectionDelegate delegate = new ServerDelegate() { - public SessionDelegate getSessionDelegate() + @Override public Session getSession(Connection conn, SessionAttach atc) { - return new Echo(); + Session ssn = super.getSession(conn, atc); + ssn.setSessionListener(new Echo()); + return ssn; } }; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index c27419cadc..be1ea54c93 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -55,24 +55,22 @@ public class ServerDelegate extends ConnectionDelegate private SaslServer saslServer; - public void init(Channel ch, ProtocolHeader hdr) + public void init(Connection conn, ProtocolHeader hdr) { - Connection conn = ch.getConnection(); conn.send(new ProtocolHeader(1, 0, 10)); List<Object> utf8 = new ArrayList<Object>(); utf8.add("utf8"); - ch.connectionStart(null, Collections.EMPTY_LIST, utf8); + conn.connectionStart(null, Collections.EMPTY_LIST, utf8); } - @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok) + @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) { - Connection conn = ch.getConnection(); conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); if (mechanism == null || mechanism.length() == 0) { - ch.connectionTune + conn.connectionTune (Integer.MAX_VALUE, org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, Integer.MAX_VALUE); @@ -85,13 +83,13 @@ public class ServerDelegate extends ConnectionDelegate (mechanism, "AMQP", "localhost", null, null); if (ss == null) { - ch.connectionClose + conn.connectionClose (ConnectionCloseCode.CONNECTION_FORCED, "null SASL mechanism: " + mechanism); return; } conn.setSaslServer(ss); - secure(ch, ok.getResponse()); + secure(conn, ok.getResponse()); } catch (SaslException e) { @@ -99,9 +97,8 @@ public class ServerDelegate extends ConnectionDelegate } } - private void secure(Channel ch, byte[] response) + private void secure(Connection conn, byte[] response) { - Connection conn = ch.getConnection(); SaslServer ss = conn.getSaslServer(); try { @@ -109,14 +106,14 @@ public class ServerDelegate extends ConnectionDelegate if (ss.isComplete()) { ss.dispose(); - ch.connectionTune + conn.connectionTune (Integer.MAX_VALUE, org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, Integer.MAX_VALUE); } else { - ch.connectionSecure(challenge); + conn.connectionSecure(challenge); } } catch (SaslException e) @@ -125,21 +122,32 @@ public class ServerDelegate extends ConnectionDelegate } } - @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok) + @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) { - secure(ch, ok.getResponse()); + secure(conn, ok.getResponse()); } - @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok) + @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) { } - @Override public void connectionOpen(Channel ch, ConnectionOpen open) + @Override public void connectionOpen(Connection conn, ConnectionOpen open) { - Connection conn = ch.getConnection(); - ch.connectionOpenOk(Collections.EMPTY_LIST); + conn.connectionOpenOk(Collections.EMPTY_LIST); conn.setState(OPEN); } + public Session getSession(Connection conn, SessionAttach atc) + { + return new Session(conn, new Binary(atc.getName())); + } + + @Override public void sessionAttach(Connection conn, SessionAttach atc) + { + Session ssn = getSession(conn, atc); + conn.map(ssn, atc.getChannel()); + ssn.sessionAttached(atc.getName()); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index df4313e15b..125f000543 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -44,7 +44,7 @@ import static org.apache.qpid.util.Strings.*; * @author Rafael H. Schloming */ -public class Session extends Invoker +public class Session extends SessionInvoker { private static final Logger log = Logger.get(Session.class); @@ -84,14 +84,14 @@ public class Session extends Invoker } } - private byte[] name; + private Connection connection; + private Binary name; + private int channel; + private SessionDelegate delegate = new SessionDelegate(); private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; - // channel may be null - Channel channel; - // incoming command count int commandsIn = 0; // completed incoming commands @@ -108,16 +108,27 @@ public class Session extends Invoker private AtomicBoolean closed = new AtomicBoolean(false); - public Session(byte[] name) + Session(Connection connection, Binary name) { + this.connection = connection; this.name = name; } - public byte[] getName() + public Binary getName() { return name; } + int getChannel() + { + return channel; + } + + void setChannel(int channel) + { + this.channel = channel; + } + public void setSessionListener(SessionListener listener) { if (listener == null) @@ -212,8 +223,12 @@ public class Session extends Invoker { maxProcessed = max(maxProcessed, upper); } - flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint); - syncPoint = maxProcessed; + boolean synced = ge(maxProcessed, syncPoint); + flush = lt(old, syncPoint) && synced; + if (synced) + { + syncPoint = maxProcessed; + } } if (flush) { @@ -266,12 +281,6 @@ public class Session extends Invoker } } - public void attach(Channel channel) - { - this.channel = channel; - channel.setSession(this); - } - public Method getCommand(int id) { synchronized (commands) @@ -304,6 +313,22 @@ public class Session extends Invoker } } + void received(Method m) + { + m.delegate(this, delegate); + } + + private void send(Method m) + { + m.setChannel(channel); + connection.send(m); + + if (!m.isBatch()) + { + connection.flush(); + } + } + public void invoke(Method m) { if (closed.get()) @@ -342,7 +367,7 @@ public class Session extends Invoker m.setSync(true); } needSync = !m.isSync(); - channel.method(m); + send(m); if (autoSync) { sync(); @@ -358,7 +383,7 @@ public class Session extends Invoker } else { - channel.method(m); + send(m); } } @@ -564,7 +589,7 @@ public class Session extends Invoker public void close() { sessionRequestTimeout(0); - sessionDetach(name); + sessionDetach(name.getBytes()); synchronized (commands) { long start = System.currentTimeMillis(); @@ -576,12 +601,19 @@ public class Session extends Invoker commands.wait(timeout - elapsed); elapsed = System.currentTimeMillis() - start; } + + if (!closed.get()) + { + throw new SessionException("close() timed out"); + } } catch (InterruptedException e) { throw new RuntimeException(e); } } + + connection.removeSession(this); } public void exception(Throwable t) @@ -606,13 +638,11 @@ public class Session extends Invoker } } } - channel.setSession(null); - channel = null; } public String toString() { - return String.format("ssn:%s", str(name)); + return String.format("ssn:%s", name); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java index e4ce7329a9..64f9039484 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -38,7 +38,7 @@ public class SessionClosedException extends SessionException public SessionClosedException(Throwable cause) { - super(null, cause); + super("session closed", null, cause); } @Override public void rethrow() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 3e6fa9d5d9..f6a1735b68 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.transport.util.Logger; /** @@ -33,6 +33,8 @@ public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { + private static final Logger log = Logger.get(SessionDelegate.class); + public void init(Session ssn, ProtocolHeader hdr) { } public void control(Session ssn, Method method) { @@ -50,15 +52,12 @@ public class SessionDelegate public void error(Session ssn, ProtocolError error) { } - @Override public void executionResult(Session ssn, ExecutionResult result) + public void handle(Session ssn, Method method) { - ssn.result(result.getCommandId(), result.getValue()); + log.warn("UNHANDLED: [%s] %s", ssn, method); } - @Override public void executionException(Session ssn, ExecutionException exc) - { - ssn.setException(exc); - } + @Override public void sessionTimeout(Session ssn, SessionTimeout t) {} @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { @@ -122,6 +121,16 @@ public class SessionDelegate ssn.syncPoint(); } + @Override public void executionResult(Session ssn, ExecutionResult result) + { + ssn.result(result.getCommandId(), result.getValue()); + } + + @Override public void executionException(Session ssn, ExecutionException exc) + { + ssn.setException(exc); + } + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { ssn.getSessionListener().message(ssn, xfr); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java index e90ad8caf6..c4fc9558a1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java @@ -32,15 +32,20 @@ public class SessionException extends TransportException private ExecutionException exception; - public SessionException(ExecutionException exception, Throwable cause) + public SessionException(String message, ExecutionException exception, Throwable cause) { - super(String.valueOf(exception), cause); + super(message, cause); this.exception = exception; } public SessionException(ExecutionException exception) { - this(exception, null); + this(String.valueOf(exception), exception, null); + } + + public SessionException(String message) + { + this(message, null, null); } public ExecutionException getException() @@ -50,7 +55,7 @@ public class SessionException extends TransportException @Override public void rethrow() { - throw new SessionException(exception, this); + throw new SessionException(getMessage(), exception, this); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java index 617867cae6..622993effb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -31,7 +31,7 @@ import org.apache.qpid.transport.network.io.IoAcceptor; * */ -public class Sink extends SessionDelegate +public class Sink implements SessionListener { private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s"; @@ -85,7 +85,9 @@ public class Sink extends SessionDelegate return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024)); } - public void messageTransfer(Session ssn, MessageTransfer xfr) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { count++; bytes += xfr.getBody().remaining(); @@ -101,14 +103,22 @@ public class Sink extends SessionDelegate ssn.processed(xfr); } + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + public static final void main(String[] args) throws IOException { ConnectionDelegate delegate = new ServerDelegate() { - - public SessionDelegate getSessionDelegate() + @Override public Session getSession(Connection conn, SessionAttach atc) { - return new Sink(); + Session ssn = super.getSession(conn, atc); + ssn.setSessionListener(new Sink()); + return ssn; } }; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index f70a13ec3c..f25b16d71a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -181,6 +181,11 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> { if (!closed.getAndSet(true)) { + synchronized (notFull) + { + notFull.notify(); + } + synchronized (notEmpty) { notEmpty.notify(); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index e61ffb501b..03fae56250 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -51,10 +51,10 @@ public class ConnectionTest extends TestCase port = AvailablePortFinder.getNextAvailable(12000); ConnectionDelegate server = new ServerDelegate() { - @Override public void connectionOpen(Channel ch, ConnectionOpen open) + @Override public void connectionOpen(Connection conn, ConnectionOpen open) { - super.connectionOpen(ch, open); - ch.getConnection().close(); + super.connectionOpen(conn, open); + conn.close(); } }; @@ -94,13 +94,9 @@ public class ConnectionTest extends TestCase fail("never got notified of connection close"); } - Channel ch = conn.getChannel(0); - Session ssn = new Session("test".getBytes()); - ssn.attach(ch); - try { - ssn.sessionAttach(ssn.getName()); + conn.connectionCloseOk(); fail("writing to a closed socket succeeded"); } catch (TransportException e) |
