summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-13 16:07:01 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-13 16:07:01 +0000
commit9ebd8a5bd4dd60e5983cfa998fea1c78fdb87401 (patch)
tree6cbde7bc931ccfc726a27cd5b1573c43dea5fc8e /qpid/java/common/src
parentea476363bc00e42de3b1e39f2c6d4a2ed6cb677f (diff)
downloadqpid-python-9ebd8a5bd4dd60e5983cfa998fea1c78fdb87401.tar.gz
QPID-1339:
- Removed the Channel class in order to simplify the state management surrounding Sessions and Connections. - Consolidated the ChannelDelegate into the ConnectionDelegate. - Modified MethodDelegate to invoke a generic handle method as the default action for each dispatched method. - Modified the code generator to produce a separate ConnectionInvoker and SessionInvoker. - Modified the invoker template to use package level visibility for all controls rather than public visibility. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704147 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java42
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java176
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java54
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java111
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java51
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java19
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java44
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java72
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java5
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java12
16 files changed, 304 insertions, 385 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
index 79bb286d76..8163210f14 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -33,48 +33,28 @@ import org.apache.qpid.transport.network.mina.MinaHandler;
* @author Rafael H. Schloming
*/
-class ToyClient extends SessionDelegate
+class ToyClient implements SessionListener
{
+ public void opened(Session ssn) {}
- @Override public void messageReject(Session ssn, MessageReject reject)
+ public void exception(Session ssn, SessionException exc)
{
- for (Range range : reject.getTransfers())
- {
- for (long l = range.getLower(); l <= range.getUpper(); l++)
- {
- System.out.println("message rejected: " +
- ssn.getCommand((int) l));
- }
- }
+ exc.printStackTrace();
}
- @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("msg: " + xfr);
}
+ public void closed(Session ssn) {}
+
public static final void main(String[] args)
{
- Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ClientDelegate(null, "guest", "guest")
- {
- public SessionDelegate getSessionDelegate()
- {
- return new ToyClient();
- }
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
- });
- conn.send(new ProtocolHeader
- (1, 0, 10));
-
- Channel ch = conn.getChannel(0);
- Session ssn = new Session("my-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ Connection conn = new Connection();
+ conn.connect("0.0.0.0", 5672, null, "guest", "guest");
+ Session ssn = conn.createSession();
+ ssn.setSessionListener(new ToyClient());
ssn.queueDeclare("asdf", null, null);
ssn.sync();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
index e6dedc536f..4e97855a6f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
/**
* Binary
@@ -51,6 +55,13 @@ public final class Binary
this(bytes, 0, bytes.length);
}
+ public final byte[] getBytes()
+ {
+ byte[] result = new byte[size];
+ System.arraycopy(bytes, offset, result, 0, size);
+ return result;
+ }
+
public final byte[] array()
{
return bytes;
@@ -126,4 +137,9 @@ public final class Binary
return true;
}
+ public String toString()
+ {
+ return str(ByteBuffer.wrap(bytes, offset, size));
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
deleted file mode 100644
index d973739ed6..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import org.apache.qpid.transport.network.Frame;
-import org.apache.qpid.transport.util.Logger;
-
-import java.nio.ByteBuffer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.qpid.transport.network.Frame.*;
-import static org.apache.qpid.transport.util.Functions.*;
-
-
-/**
- * Channel
- *
- * @author Rafael H. Schloming
- */
-
-public class Channel extends Invoker
- implements Receiver<ProtocolEvent>, ProtocolDelegate<Void>
-{
-
- private static final Logger log = Logger.get(Channel.class);
-
- final private Connection connection;
- final private int channel;
- final private MethodDelegate<Channel> delegate;
- final private SessionDelegate sessionDelegate;
- // session may be null
- private Session session;
-
- public Channel(Connection connection, int channel, SessionDelegate delegate)
- {
- this.connection = connection;
- this.channel = channel;
- this.delegate = new ChannelDelegate();
- this.sessionDelegate = delegate;
- }
-
- public Connection getConnection()
- {
- return connection;
- }
-
- public void received(ProtocolEvent event)
- {
- event.delegate(null, this);
- }
-
- public void init(Void v, ProtocolHeader hdr)
- {
- connection.getConnectionDelegate().init(this, hdr);
- }
-
- public void control(Void v, Method method)
- {
- switch (method.getEncodedTrack())
- {
- case L1:
- method.dispatch(this, connection.getConnectionDelegate());
- break;
- case L2:
- method.dispatch(this, delegate);
- break;
- case L3:
- method.delegate(session, sessionDelegate);
- break;
- default:
- throw new IllegalStateException
- ("unknown track: " + method.getEncodedTrack());
- }
- }
-
- public void command(Void v, Method method)
- {
- method.delegate(session, sessionDelegate);
- }
-
- public void error(Void v, ProtocolError error)
- {
- throw new RuntimeException(error.getMessage());
- }
-
- public void exception(Throwable t)
- {
- session.exception(t);
- }
-
- public void closed()
- {
- log.debug("channel closed: %s", this);
- if (session != null)
- {
- session.closed();
- }
- connection.removeChannel(channel);
- }
-
- public int getEncodedChannel() {
- return channel;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- void setSession(Session session)
- {
- this.session = session;
- }
-
- void closeCode(ConnectionClose close)
- {
- if (session != null)
- {
- session.closeCode(close);
- }
- }
-
- private void emit(ProtocolEvent event)
- {
- event.setChannel(channel);
- connection.send(event);
- }
-
- public void method(Method m)
- {
- emit(m);
-
- if (!m.isBatch())
- {
- connection.flush();
- }
- }
-
- protected void invoke(Method m)
- {
- method(m);
- }
-
- protected <T> Future<T> invoke(Method m, Class<T> cls)
- {
- throw new UnsupportedOperationException();
- }
-
- public String toString()
- {
- return String.format("%s:%s", connection, channel);
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
deleted file mode 100644
index 8475fbf174..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.util.UUID;
-
-
-/**
- * ChannelDelegate
- *
- * @author Rafael H. Schloming
- */
-
-class ChannelDelegate extends MethodDelegate<Channel>
-{
-
- public @Override void sessionAttach(Channel channel, SessionAttach atch)
- {
- Session ssn = new Session(atch.getName());
- ssn.attach(channel);
- ssn.sessionAttached(ssn.getName());
- }
-
- public @Override void sessionDetached(Channel channel, SessionDetached closed)
- {
- channel.closed();
- }
-
- public @Override void sessionDetach(Channel channel, SessionDetach dtc)
- {
- channel.getSession().closed();
- channel.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
- channel.closed();
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index d782170aa5..2604f6970c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -62,23 +62,20 @@ public class ClientDelegate extends ConnectionDelegate
this.password = password;
}
- public void init(Channel ch, ProtocolHeader hdr)
+ public void init(Connection conn, ProtocolHeader hdr)
{
if (!(hdr.getMajor() == 0 && hdr.getMinor() == 10))
{
- Connection conn = ch.getConnection();
conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()));
}
-
}
- @Override public void connectionStart(Channel ch, ConnectionStart start)
+ @Override public void connectionStart(Connection conn, ConnectionStart start)
{
- Connection conn = ch.getConnection();
List<Object> mechanisms = start.getMechanisms();
if (mechanisms == null || mechanisms.isEmpty())
{
- ch.connectionStartOk
+ conn.connectionStartOk
(Collections.EMPTY_MAP, null, null, conn.getLocale());
return;
}
@@ -97,7 +94,7 @@ public class ClientDelegate extends ConnectionDelegate
byte[] response = sc.hasInitialResponse() ?
sc.evaluateChallenge(new byte[0]) : null;
- ch.connectionStartOk
+ conn.connectionStartOk
(Collections.EMPTY_MAP, sc.getMechanismName(), response,
conn.getLocale());
}
@@ -107,14 +104,13 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- @Override public void connectionSecure(Channel ch, ConnectionSecure secure)
+ @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
{
- Connection conn = ch.getConnection();
SaslClient sc = conn.getSaslClient();
try
{
byte[] response = sc.evaluateChallenge(secure.getChallenge());
- ch.connectionSecureOk(response);
+ conn.connectionSecureOk(response);
}
catch (SaslException e)
{
@@ -122,20 +118,19 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- @Override public void connectionTune(Channel ch, ConnectionTune tune)
+ @Override public void connectionTune(Connection conn, ConnectionTune tune)
{
- Connection conn = ch.getConnection();
conn.setChannelMax(tune.getChannelMax());
- ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
- ch.connectionOpen(vhost, null, Option.INSIST);
+ conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ conn.connectionOpen(vhost, null, Option.INSIST);
}
- @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok)
+ @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
- ch.getConnection().setState(OPEN);
+ conn.setState(OPEN);
}
- @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir)
+ @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 2c927e21fd..71027e3256 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -24,6 +24,7 @@ import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
import java.util.ArrayList;
import java.util.HashMap;
@@ -48,7 +49,7 @@ import static org.apache.qpid.transport.Connection.State.*;
* short instead of Short
*/
-public class Connection
+public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
@@ -69,7 +70,8 @@ public class Connection
private ConnectionDelegate delegate;
private Sender<ProtocolEvent> sender;
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
+ final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
private State state = NEW;
private Object lock = new Object();
@@ -200,14 +202,45 @@ public class Connection
return createSession(0);
}
- public Session createSession(long expiryInSeconds)
+ public Session createSession(long timeout)
{
- Channel ch = getChannel();
- Session ssn = new Session(UUID.randomUUID().toString().getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
- ssn.sessionRequestTimeout(expiryInSeconds);
- return ssn;
+ return createSession(UUID.randomUUID().toString(), timeout);
+ }
+
+ public Session createSession(String name)
+ {
+ return createSession(name, 0);
+ }
+
+ public Session createSession(String name, long timeout)
+ {
+ return createSession(Strings.toUTF8(name), timeout);
+ }
+
+ public Session createSession(byte[] name, long timeout)
+ {
+ return createSession(new Binary(name), timeout);
+ }
+
+ public Session createSession(Binary name, long timeout)
+ {
+ synchronized (lock)
+ {
+ Session ssn = new Session(this, name);
+ sessions.put(name, ssn);
+ map(ssn);
+ ssn.sessionAttach(name.getBytes());
+ ssn.sessionRequestTimeout(timeout);
+ return ssn;
+ }
+ }
+
+ void removeSession(Session ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.remove(ssn.getName());
+ }
}
public void setConnectionId(int id)
@@ -228,8 +261,7 @@ public class Connection
public void received(ProtocolEvent event)
{
log.debug("RECV: [%s] %s", this, event);
- Channel channel = getChannel(event.getChannel());
- channel.received(event);
+ event.delegate(this, delegate);
}
public void send(ProtocolEvent event)
@@ -249,6 +281,22 @@ public class Connection
sender.flush();
}
+ protected void invoke(Method method)
+ {
+ method.setChannel(0);
+ send(method);
+ if (!method.isBatch())
+ {
+ flush();
+ }
+ }
+
+ public void dispatch(Method method)
+ {
+ Session ssn = getSession(method.getChannel());
+ ssn.received(method);
+ }
+
public int getChannelMax()
{
return channelMax;
@@ -259,7 +307,7 @@ public class Connection
channelMax = max;
}
- public Channel getChannel()
+ private int map(Session ssn)
{
synchronized (lock)
{
@@ -267,7 +315,8 @@ public class Connection
{
if (!channels.containsKey(i))
{
- return getChannel(i);
+ map(ssn, i);
+ return i;
}
}
@@ -275,25 +324,28 @@ public class Connection
}
}
- public Channel getChannel(int number)
+ void map(Session ssn, int channel)
{
synchronized (lock)
{
- Channel channel = channels.get(number);
- if (channel == null)
- {
- channel = new Channel(this, number, delegate.getSessionDelegate());
- channels.put(number, channel);
- }
- return channel;
+ channels.put(channel, ssn);
+ ssn.setChannel(channel);
+ }
+ }
+
+ void unmap(Session ssn)
+ {
+ synchronized (lock)
+ {
+ channels.remove(ssn.getChannel());
}
}
- void removeChannel(int number)
+ Session getSession(int channel)
{
synchronized (lock)
{
- channels.remove(number);
+ return channels.get(channel);
}
}
@@ -324,9 +376,9 @@ public class Connection
{
synchronized (lock)
{
- for (Channel ch : channels.values())
+ for (Session ssn : channels.values())
{
- ch.closeCode(close);
+ ssn.closeCode(close);
}
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
@@ -347,10 +399,10 @@ public class Connection
synchronized (lock)
{
- List<Channel> values = new ArrayList<Channel>(channels.values());
- for (Channel ch : values)
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
{
- ch.closed();
+ ssn.closed();
}
sender = null;
@@ -367,9 +419,8 @@ public class Connection
switch (state)
{
case OPEN:
- Channel ch = getChannel(0);
state = CLOSING;
- ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+ connectionClose(ConnectionCloseCode.NORMAL, null);
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state == CLOSING && error == null)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 9056a3120b..06f80be6dd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -37,31 +37,64 @@ import static org.apache.qpid.transport.Connection.State.*;
*
* the connectionClose is kind of different for both sides
*/
-public abstract class ConnectionDelegate extends MethodDelegate<Channel>
+public abstract class ConnectionDelegate
+ extends MethodDelegate<Connection>
+ implements ProtocolDelegate<Connection>
{
private static final Logger log = Logger.get(ConnectionDelegate.class);
- public SessionDelegate getSessionDelegate()
+ public void control(Connection conn, Method method)
{
- return new SessionDelegate();
+ method.dispatch(conn, this);
}
- public abstract void init(Channel ch, ProtocolHeader hdr);
+ public void command(Connection conn, Method method)
+ {
+ method.dispatch(conn, this);
+ }
+
+ public void error(Connection conn, ProtocolError error)
+ {
+ conn.exception(new ConnectionException(error.getMessage()));
+ }
- @Override public void connectionClose(Channel ch, ConnectionClose close)
+ public void handle(Connection conn, Method method)
{
- Connection conn = ch.getConnection();
- ch.connectionCloseOk();
+ conn.dispatch(method);
+ }
+
+ @Override public void connectionClose(Connection conn, ConnectionClose close)
+ {
+ conn.connectionCloseOk();
conn.getSender().close();
conn.closeCode(close);
conn.setState(CLOSE_RCVD);
}
- @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
+ @Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok)
{
- Connection conn = ch.getConnection();
conn.getSender().close();
}
+ @Override public void sessionAttached(Connection conn, SessionAttached atc)
+ {
+
+ }
+
+ @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+ {
+ Session ssn = conn.getSession(dtc.getChannel());
+ conn.unmap(ssn);
+ ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ ssn.closed();
+ }
+
+ @Override public void sessionDetached(Connection conn, SessionDetached dtc)
+ {
+ Session ssn = conn.getSession(dtc.getChannel());
+ conn.unmap(ssn);
+ ssn.closed();
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index 89b59c2512..dcf05d9f72 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -32,22 +32,33 @@ import org.apache.qpid.transport.network.io.IoAcceptor;
*
*/
-public class Echo extends SessionDelegate
+public class Echo implements SessionListener
{
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
ssn.invoke(xfr);
ssn.processed(xfr);
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public static final void main(String[] args) throws IOException
{
ConnectionDelegate delegate = new ServerDelegate()
{
- public SessionDelegate getSessionDelegate()
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- return new Echo();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(new Echo());
+ return ssn;
}
};
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index c27419cadc..be1ea54c93 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -55,24 +55,22 @@ public class ServerDelegate extends ConnectionDelegate
private SaslServer saslServer;
- public void init(Channel ch, ProtocolHeader hdr)
+ public void init(Connection conn, ProtocolHeader hdr)
{
- Connection conn = ch.getConnection();
conn.send(new ProtocolHeader(1, 0, 10));
List<Object> utf8 = new ArrayList<Object>();
utf8.add("utf8");
- ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
}
- @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+ @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
- Connection conn = ch.getConnection();
conn.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
if (mechanism == null || mechanism.length() == 0)
{
- ch.connectionTune
+ conn.connectionTune
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
@@ -85,13 +83,13 @@ public class ServerDelegate extends ConnectionDelegate
(mechanism, "AMQP", "localhost", null, null);
if (ss == null)
{
- ch.connectionClose
+ conn.connectionClose
(ConnectionCloseCode.CONNECTION_FORCED,
"null SASL mechanism: " + mechanism);
return;
}
conn.setSaslServer(ss);
- secure(ch, ok.getResponse());
+ secure(conn, ok.getResponse());
}
catch (SaslException e)
{
@@ -99,9 +97,8 @@ public class ServerDelegate extends ConnectionDelegate
}
}
- private void secure(Channel ch, byte[] response)
+ private void secure(Connection conn, byte[] response)
{
- Connection conn = ch.getConnection();
SaslServer ss = conn.getSaslServer();
try
{
@@ -109,14 +106,14 @@ public class ServerDelegate extends ConnectionDelegate
if (ss.isComplete())
{
ss.dispose();
- ch.connectionTune
+ conn.connectionTune
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
}
else
{
- ch.connectionSecure(challenge);
+ conn.connectionSecure(challenge);
}
}
catch (SaslException e)
@@ -125,21 +122,32 @@ public class ServerDelegate extends ConnectionDelegate
}
}
- @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+ @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
{
- secure(ch, ok.getResponse());
+ secure(conn, ok.getResponse());
}
- @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+ @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
}
- @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
- Connection conn = ch.getConnection();
- ch.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.connectionOpenOk(Collections.EMPTY_LIST);
conn.setState(OPEN);
}
+ public Session getSession(Connection conn, SessionAttach atc)
+ {
+ return new Session(conn, new Binary(atc.getName()));
+ }
+
+ @Override public void sessionAttach(Connection conn, SessionAttach atc)
+ {
+ Session ssn = getSession(conn, atc);
+ conn.map(ssn, atc.getChannel());
+ ssn.sessionAttached(atc.getName());
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index df4313e15b..125f000543 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -44,7 +44,7 @@ import static org.apache.qpid.util.Strings.*;
* @author Rafael H. Schloming
*/
-public class Session extends Invoker
+public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
@@ -84,14 +84,14 @@ public class Session extends Invoker
}
}
- private byte[] name;
+ private Connection connection;
+ private Binary name;
+ private int channel;
+ private SessionDelegate delegate = new SessionDelegate();
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
- // channel may be null
- Channel channel;
-
// incoming command count
int commandsIn = 0;
// completed incoming commands
@@ -108,16 +108,27 @@ public class Session extends Invoker
private AtomicBoolean closed = new AtomicBoolean(false);
- public Session(byte[] name)
+ Session(Connection connection, Binary name)
{
+ this.connection = connection;
this.name = name;
}
- public byte[] getName()
+ public Binary getName()
{
return name;
}
+ int getChannel()
+ {
+ return channel;
+ }
+
+ void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
public void setSessionListener(SessionListener listener)
{
if (listener == null)
@@ -212,8 +223,12 @@ public class Session extends Invoker
{
maxProcessed = max(maxProcessed, upper);
}
- flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
- syncPoint = maxProcessed;
+ boolean synced = ge(maxProcessed, syncPoint);
+ flush = lt(old, syncPoint) && synced;
+ if (synced)
+ {
+ syncPoint = maxProcessed;
+ }
}
if (flush)
{
@@ -266,12 +281,6 @@ public class Session extends Invoker
}
}
- public void attach(Channel channel)
- {
- this.channel = channel;
- channel.setSession(this);
- }
-
public Method getCommand(int id)
{
synchronized (commands)
@@ -304,6 +313,22 @@ public class Session extends Invoker
}
}
+ void received(Method m)
+ {
+ m.delegate(this, delegate);
+ }
+
+ private void send(Method m)
+ {
+ m.setChannel(channel);
+ connection.send(m);
+
+ if (!m.isBatch())
+ {
+ connection.flush();
+ }
+ }
+
public void invoke(Method m)
{
if (closed.get())
@@ -342,7 +367,7 @@ public class Session extends Invoker
m.setSync(true);
}
needSync = !m.isSync();
- channel.method(m);
+ send(m);
if (autoSync)
{
sync();
@@ -358,7 +383,7 @@ public class Session extends Invoker
}
else
{
- channel.method(m);
+ send(m);
}
}
@@ -564,7 +589,7 @@ public class Session extends Invoker
public void close()
{
sessionRequestTimeout(0);
- sessionDetach(name);
+ sessionDetach(name.getBytes());
synchronized (commands)
{
long start = System.currentTimeMillis();
@@ -576,12 +601,19 @@ public class Session extends Invoker
commands.wait(timeout - elapsed);
elapsed = System.currentTimeMillis() - start;
}
+
+ if (!closed.get())
+ {
+ throw new SessionException("close() timed out");
+ }
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
+
+ connection.removeSession(this);
}
public void exception(Throwable t)
@@ -606,13 +638,11 @@ public class Session extends Invoker
}
}
}
- channel.setSession(null);
- channel = null;
}
public String toString()
{
- return String.format("ssn:%s", str(name));
+ return String.format("ssn:%s", name);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
index e4ce7329a9..64f9039484 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -38,7 +38,7 @@ public class SessionClosedException extends SessionException
public SessionClosedException(Throwable cause)
{
- super(null, cause);
+ super("session closed", null, cause);
}
@Override public void rethrow()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 3e6fa9d5d9..f6a1735b68 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
/**
@@ -33,6 +33,8 @@ public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
+ private static final Logger log = Logger.get(SessionDelegate.class);
+
public void init(Session ssn, ProtocolHeader hdr) { }
public void control(Session ssn, Method method) {
@@ -50,15 +52,12 @@ public class SessionDelegate
public void error(Session ssn, ProtocolError error) { }
- @Override public void executionResult(Session ssn, ExecutionResult result)
+ public void handle(Session ssn, Method method)
{
- ssn.result(result.getCommandId(), result.getValue());
+ log.warn("UNHANDLED: [%s] %s", ssn, method);
}
- @Override public void executionException(Session ssn, ExecutionException exc)
- {
- ssn.setException(exc);
- }
+ @Override public void sessionTimeout(Session ssn, SessionTimeout t) {}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
{
@@ -122,6 +121,16 @@ public class SessionDelegate
ssn.syncPoint();
}
+ @Override public void executionResult(Session ssn, ExecutionResult result)
+ {
+ ssn.result(result.getCommandId(), result.getValue());
+ }
+
+ @Override public void executionException(Session ssn, ExecutionException exc)
+ {
+ ssn.setException(exc);
+ }
+
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
ssn.getSessionListener().message(ssn, xfr);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
index e90ad8caf6..c4fc9558a1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
@@ -32,15 +32,20 @@ public class SessionException extends TransportException
private ExecutionException exception;
- public SessionException(ExecutionException exception, Throwable cause)
+ public SessionException(String message, ExecutionException exception, Throwable cause)
{
- super(String.valueOf(exception), cause);
+ super(message, cause);
this.exception = exception;
}
public SessionException(ExecutionException exception)
{
- this(exception, null);
+ this(String.valueOf(exception), exception, null);
+ }
+
+ public SessionException(String message)
+ {
+ this(message, null, null);
}
public ExecutionException getException()
@@ -50,7 +55,7 @@ public class SessionException extends TransportException
@Override public void rethrow()
{
- throw new SessionException(exception, this);
+ throw new SessionException(getMessage(), exception, this);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
index 617867cae6..622993effb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -31,7 +31,7 @@ import org.apache.qpid.transport.network.io.IoAcceptor;
*
*/
-public class Sink extends SessionDelegate
+public class Sink implements SessionListener
{
private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
@@ -85,7 +85,9 @@ public class Sink extends SessionDelegate
return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024));
}
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
count++;
bytes += xfr.getBody().remaining();
@@ -101,14 +103,22 @@ public class Sink extends SessionDelegate
ssn.processed(xfr);
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public static final void main(String[] args) throws IOException
{
ConnectionDelegate delegate = new ServerDelegate()
{
-
- public SessionDelegate getSessionDelegate()
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- return new Sink();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(new Sink());
+ return ssn;
}
};
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index f70a13ec3c..f25b16d71a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -181,6 +181,11 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
{
if (!closed.getAndSet(true))
{
+ synchronized (notFull)
+ {
+ notFull.notify();
+ }
+
synchronized (notEmpty)
{
notEmpty.notify();
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index e61ffb501b..03fae56250 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -51,10 +51,10 @@ public class ConnectionTest extends TestCase
port = AvailablePortFinder.getNextAvailable(12000);
ConnectionDelegate server = new ServerDelegate() {
- @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
- super.connectionOpen(ch, open);
- ch.getConnection().close();
+ super.connectionOpen(conn, open);
+ conn.close();
}
};
@@ -94,13 +94,9 @@ public class ConnectionTest extends TestCase
fail("never got notified of connection close");
}
- Channel ch = conn.getChannel(0);
- Session ssn = new Session("test".getBytes());
- ssn.attach(ch);
-
try
{
- ssn.sessionAttach(ssn.getName());
+ conn.connectionCloseOk();
fail("writing to a closed socket succeeded");
}
catch (TransportException e)