summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
commitafcf8099695253651c73910a243fb29aa520b008 (patch)
treee514bc51797181c567500a8ddbfc20ea9b89b908 /qpid/java/common/src
parentf315ac548e346ded9ed1d081db4118e703c362b4 (diff)
downloadqpid-python-afcf8099695253651c73910a243fb29aa520b008.tar.gz
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java28
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java7
17 files changed, 288 insertions, 52 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 05fd2bb480..374644b4f2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder
{
_logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
}
-
+
out.write(buffer);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index 1ff39ca790..647d531476 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import java.util.Date;
+import java.util.Map;
+import java.math.BigDecimal;
+
/**
* AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
* value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -113,4 +117,63 @@ public class AMQTypedValue
return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
}
+
+ public static AMQTypedValue toTypedValue(Object val)
+ {
+ if(val == null)
+ {
+ return AMQType.VOID.asTypedValue(null);
+ }
+
+ Class klass = val.getClass();
+ if(klass == String.class)
+ {
+ return AMQType.ASCII_STRING.asTypedValue(val);
+ }
+ else if(klass == Character.class)
+ {
+ return AMQType.ASCII_CHARACTER.asTypedValue(val);
+ }
+ else if(klass == Integer.class)
+ {
+ return AMQType.INT.asTypedValue(val);
+ }
+ else if(klass == Long.class)
+ {
+ return AMQType.LONG.asTypedValue(val);
+ }
+ else if(klass == Float.class)
+ {
+ return AMQType.FLOAT.asTypedValue(val);
+ }
+ else if(klass == Double.class)
+ {
+ return AMQType.DOUBLE.asTypedValue(val);
+ }
+ else if(klass == Date.class)
+ {
+ return AMQType.TIMESTAMP.asTypedValue(val);
+ }
+ else if(klass == Byte.class)
+ {
+ return AMQType.BYTE.asTypedValue(val);
+ }
+ else if(klass == Boolean.class)
+ {
+ return AMQType.BOOLEAN.asTypedValue(val);
+ }
+ else if(klass == byte[].class)
+ {
+ return AMQType.BINARY.asTypedValue(val);
+ }
+ else if(klass == BigDecimal.class)
+ {
+ return AMQType.DECIMAL.asTypedValue(val);
+ }
+ else if(val instanceof Map)
+ {
+ return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val));
+ }
+ return null;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index ed01c91804..9b2f9b3969 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -828,6 +828,7 @@ public class FieldTable
recalculateEncodedSize();
}
+
public static interface FieldTableElementProcessor
{
public boolean processElement(String propertyName, AMQTypedValue value);
@@ -904,10 +905,13 @@ public class FieldTable
}
}
+ public Object get(String key)
+ {
+ return get(new AMQShortString(key));
+ }
public Object get(AMQShortString key)
{
-
return getObject(key);
}
@@ -1184,4 +1188,24 @@ public class FieldTable
return _properties.equals(f._properties);
}
+
+ public static FieldTable convertToFieldTable(Map<String, Object> map)
+ {
+ if (map != null)
+ {
+ FieldTable table = new FieldTable();
+ for(Map.Entry<String,Object> entry : map.entrySet())
+ {
+ table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+
+ return table;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
index 0a1cedc4e6..7544d9b7e7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
@@ -23,10 +23,14 @@ package org.apache.qpid.framing.abstraction;
import org.apache.qpid.framing.AMQBody;
+import java.nio.ByteBuffer;
+
public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
{
AMQBody convertToBody(ContentChunk contentBody);
ContentChunk convertToContentChunk(AMQBody body);
void configure();
+
+ AMQBody convertToBody(ByteBuffer buf);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
index a5c5e5f22d..1c4a29b106 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -72,6 +72,11 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
}
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
index 1b0be2b9cc..c87820b9b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
@@ -80,6 +80,11 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot
_basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
}
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 5bfc189b02..31953ea6ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -28,38 +28,34 @@ import org.apache.qpid.transport.Receiver;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
- * decodes it and then process the result.
- */
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
-{
- // Sets the network driver providing data for this ProtocolEngine
+ * decodes it and then process the result.
+ */
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+{
+ // Sets the network driver providing data for this ProtocolEngine
void setNetworkDriver (NetworkDriver driver);
-
- // Returns the remote address of the NetworkDriver
+
+ // Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
- // Returns the local address of the NetworkDriver
+ // Returns the local address of the NetworkDriver
SocketAddress getLocalAddress();
-
- // Returns number of bytes written
+
+ // Returns number of bytes written
long getWrittenBytes();
-
- // Returns number of bytes read
+
+ // Returns number of bytes read
long getReadBytes();
-
- // Called by the NetworkDriver when the socket has been closed for reading
+
+ // Called by the NetworkDriver when the socket has been closed for reading
void closed();
-
- // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
- // heartbeat)
+
+ // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+ // heartbeat)
void writerIdle();
-
- // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+
+ // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
-
- /**
- * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and
- * passes the data onto the NetworkDriver for sending
- */
- void writeFrame(AMQDataBlock frame);
-} \ No newline at end of file
+
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 1cdd1da72b..3403b591f3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -55,7 +55,7 @@ public class Connection extends ConnectionInvoker
private static final Logger log = Logger.get(Connection.class);
- enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
class DefaultConnectionListener implements ConnectionListener
{
@@ -84,7 +84,8 @@ public class Connection extends ConnectionInvoker
private SaslServer saslServer;
private SaslClient saslClient;
private long idleTimeout = 0;
-
+ private String _authorizationID;
+
// want to make this final
private int _connectionId;
@@ -118,7 +119,7 @@ public class Connection extends ConnectionInvoker
sender.setIdleTimeout(idleTimeout);
}
- void setState(State state)
+ protected void setState(State state)
{
synchronized (lock)
{
@@ -315,7 +316,14 @@ public class Connection extends ConnectionInvoker
public void dispatch(Method method)
{
Session ssn = getSession(method.getChannel());
- ssn.received(method);
+ if(ssn != null)
+ {
+ ssn.received(method);
+ }
+ else
+ {
+ // TODO
+ }
}
public int getChannelMax()
@@ -525,7 +533,17 @@ public class Connection extends ConnectionInvoker
{
return idleTimeout;
}
-
+
+ public void setAuthorizationID(String authorizationID)
+ {
+ _authorizationID = authorizationID;
+ }
+
+ public String getAuthorizationID()
+ {
+ return _authorizationID;
+ }
+
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 611c742fb1..3c80180d0b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -35,6 +35,7 @@ import static org.apache.qpid.transport.util.Functions.*;
public abstract class Method extends Struct implements ProtocolEvent
{
+
public static final Method create(int type)
{
// XXX: should generate separate factories for separate
@@ -43,12 +44,18 @@ public abstract class Method extends Struct implements ProtocolEvent
}
// XXX: command subclass?
+ public static interface CompletionListener
+ {
+ public void onComplete(Method method);
+ }
+
private int id;
private int channel;
private boolean idSet = false;
private boolean sync = false;
private boolean batch = false;
private boolean unreliable = false;
+ private CompletionListener completionListener;
public final int getId()
{
@@ -61,6 +68,11 @@ public abstract class Method extends Struct implements ProtocolEvent
this.idSet = true;
}
+ boolean idSet()
+ {
+ return idSet;
+ }
+
public final int getChannel()
{
return channel;
@@ -76,7 +88,7 @@ public abstract class Method extends Struct implements ProtocolEvent
return sync;
}
- final void setSync(boolean value)
+ public final void setSync(boolean value)
{
this.sync = value;
}
@@ -152,6 +164,26 @@ public abstract class Method extends Struct implements ProtocolEvent
}
}
+
+ public void setCompletionListener(CompletionListener completionListener)
+ {
+ this.completionListener = completionListener;
+ }
+
+ public void complete()
+ {
+ if(completionListener!= null)
+ {
+ completionListener.onComplete(this);
+ completionListener = null;
+ }
+ }
+
+ public boolean hasCompletionListener()
+ {
+ return completionListener != null;
+ }
+
public String toString()
{
StringBuilder str = new StringBuilder();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index 9b2744ee8b..3850dc162b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -52,6 +52,11 @@ public final class RangeSet implements Iterable<Range>
return ranges.getFirst();
}
+ public Range getLast()
+ {
+ return ranges.getLast();
+ }
+
public boolean includes(Range range)
{
for (Range r : this)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 2833565afc..453921ea2b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -52,13 +52,28 @@ public class ServerDelegate extends ConnectionDelegate
{
private SaslServer saslServer;
+ private List<Object> _locales;
+ private List<Object> _mechanisms;
+ private Map<String, Object> _clientProperties;
+
+
+ public ServerDelegate()
+ {
+ this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+ }
+
+ protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
+ {
+ _clientProperties = clientProperties;
+ _mechanisms = mechanisms;
+ _locales = locales;
+ }
public void init(Connection conn, ProtocolHeader hdr)
{
conn.send(new ProtocolHeader(1, 0, 10));
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
+
+ conn.connectionStart(_clientProperties, _mechanisms, _locales);
}
@Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
@@ -77,8 +92,8 @@ public class ServerDelegate extends ConnectionDelegate
try
{
- SaslServer ss = Sasl.createSaslServer
- (mechanism, "AMQP", "localhost", null, null);
+
+ SaslServer ss = createSaslServer(mechanism);
if (ss == null)
{
conn.connectionClose
@@ -95,6 +110,14 @@ public class ServerDelegate extends ConnectionDelegate
}
}
+ protected SaslServer createSaslServer(String mechanism)
+ throws SaslException
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ return ss;
+ }
+
private void secure(Connection conn, byte[] response)
{
SaslServer ss = conn.getSaslServer();
@@ -108,6 +131,7 @@ public class ServerDelegate extends ConnectionDelegate
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
+ conn.setAuthorizationID(ss.getAuthorizationID());
}
else
{
@@ -133,9 +157,16 @@ public class ServerDelegate extends ConnectionDelegate
@Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
conn.connectionOpenOk(Collections.EMPTY_LIST);
+
conn.setState(OPEN);
}
+ protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc)
+ {
+ return new Session(conn, delegate, new Binary(atc.getName()), 0);
+ }
+
+
public Session getSession(Connection conn, SessionAttach atc)
{
return new Session(conn, new Binary(atc.getName()), 0);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 3dca4fc44e..818bb19c08 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -81,7 +81,7 @@ public class Session extends SessionInvoker
private Binary name;
private long expiry;
private int channel;
- private SessionDelegate delegate = new SessionDelegate();
+ private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -111,9 +111,15 @@ public class Session extends SessionInvoker
private Thread resumer = null;
- Session(Connection connection, Binary name, long expiry)
+ protected Session(Connection connection, Binary name, long expiry)
+ {
+ this(connection, new SessionDelegate(), name, expiry);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this.connection = connection;
+ this.delegate = delegate;
this.name = name;
this.expiry = expiry;
initReceiver();
@@ -440,7 +446,7 @@ public class Session extends SessionInvoker
}
}
- boolean complete(int lower, int upper)
+ protected boolean complete(int lower, int upper)
{
//avoid autoboxing
if(log.isDebugEnabled())
@@ -457,8 +463,9 @@ public class Session extends SessionInvoker
if (m != null)
{
commandBytes -= m.getBodySize();
+ m.complete();
+ commands[idx] = null;
}
- commands[idx] = null;
}
if (le(lower, maxComplete + 1))
{
@@ -486,13 +493,28 @@ public class Session extends SessionInvoker
}
}
- final private boolean isFull(int id)
+ protected boolean isFull(int id)
+ {
+ return isCommandsFull(id) || isBytesFull();
+ }
+
+ protected boolean isBytesFull()
{
- return id - maxComplete >= commands.length || commandBytes >= byteLimit;
+ return commandBytes >= byteLimit;
+ }
+
+ protected boolean isCommandsFull(int id)
+ {
+ return id - maxComplete >= commands.length;
}
public void invoke(Method m)
{
+ invoke(m,(Runnable)null);
+ }
+
+ public void invoke(Method m, Runnable postIdSettingAction)
+ {
if (m.getEncodedTrack() == Frame.L4)
{
if (m.hasPayload())
@@ -553,8 +575,13 @@ public class Session extends SessionInvoker
"(state=%s)", state));
}
- int next = commandsOut++;
+ int next;
+ next = commandsOut++;
m.setId(next);
+ if(postIdSettingAction != null)
+ {
+ postIdSettingAction.run();
+ }
if (isFull(next))
{
@@ -607,7 +634,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0 && !m.isUnreliable())
+ if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -617,6 +644,7 @@ public class Session extends SessionInvoker
m.setSync(true);
}
needSync = !m.isSync();
+
try
{
send(m);
@@ -641,7 +669,7 @@ public class Session extends SessionInvoker
// flush every 64K commands to avoid ambiguity on
// wraparound
- if ((next % 65536) == 0)
+ if (shouldIssueFlush(next))
{
try
{
@@ -669,6 +697,11 @@ public class Session extends SessionInvoker
}
}
+ protected boolean shouldIssueFlush(int next)
+ {
+ return (next % 65536) == 0;
+ }
+
public void sync()
{
sync(timeout);
@@ -910,6 +943,14 @@ public class Session extends SessionInvoker
}
}
}
+ if(state == CLOSED)
+ {
+ delegate.closed(this);
+ }
+ else
+ {
+ delegate.detached(this);
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index c8d0855607..6146f029b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -184,4 +184,11 @@ public class SessionDelegate
}
}
+ public void closed(Session session)
+ {
+ }
+
+ public void detached(Session session)
+ {
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 33d552b91e..357caa26e1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -186,8 +186,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
case COMMAND:
int commandType = dec.readUint16();
// read in the session header, right now we don't use it
- dec.readUint16();
+ int hdr = dec.readUint16();
command = Method.create(commandType);
+ command.setSync((0x0001 & hdr) != 0);
command.read(dec);
if (command.hasPayload())
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 408c95e075..2132fc2c03 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -39,7 +39,7 @@ import static org.apache.qpid.transport.network.InputHandler.State.*;
* @author Rafael H. Schloming
*/
-public final class InputHandler implements Receiver<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>
{
public enum State
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 6144edb947..ea48e48721 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -137,6 +137,7 @@ final class IoReceiver implements Runnable
}
catch (Throwable t)
{
+ t.printStackTrace();
if (!(shutdownBroken &&
t instanceof SocketException &&
t.getMessage().equalsIgnoreCase("socket closed") &&
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index b0d1c46572..3838bf76be 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -229,7 +229,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
if (_socketConnector instanceof SocketConnector)
{
((SocketConnector) _socketConnector).setWorkerTimeout(0);
- }
+ }
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
@@ -279,7 +279,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void send(ByteBuffer msg)
{
- _lastWriteFuture = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+ org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity());
+ minaBuf.put(msg);
+ minaBuf.flip();
+ _lastWriteFuture = _ioSession.write(minaBuf);
}
public void setIdleTimeout(long l)