diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
| commit | afcf8099695253651c73910a243fb29aa520b008 (patch) | |
| tree | e514bc51797181c567500a8ddbfc20ea9b89b908 /qpid/java/common/src | |
| parent | f315ac548e346ded9ed1d081db4118e703c362b4 (diff) | |
| download | qpid-python-afcf8099695253651c73910a243fb29aa520b008.tar.gz | |
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
17 files changed, 288 insertions, 52 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 05fd2bb480..374644b4f2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 1ff39ca790..647d531476 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import java.util.Date; +import java.util.Map; +import java.math.BigDecimal; + /** * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides @@ -113,4 +117,63 @@ public class AMQTypedValue return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); } + + public static AMQTypedValue toTypedValue(Object val) + { + if(val == null) + { + return AMQType.VOID.asTypedValue(null); + } + + Class klass = val.getClass(); + if(klass == String.class) + { + return AMQType.ASCII_STRING.asTypedValue(val); + } + else if(klass == Character.class) + { + return AMQType.ASCII_CHARACTER.asTypedValue(val); + } + else if(klass == Integer.class) + { + return AMQType.INT.asTypedValue(val); + } + else if(klass == Long.class) + { + return AMQType.LONG.asTypedValue(val); + } + else if(klass == Float.class) + { + return AMQType.FLOAT.asTypedValue(val); + } + else if(klass == Double.class) + { + return AMQType.DOUBLE.asTypedValue(val); + } + else if(klass == Date.class) + { + return AMQType.TIMESTAMP.asTypedValue(val); + } + else if(klass == Byte.class) + { + return AMQType.BYTE.asTypedValue(val); + } + else if(klass == Boolean.class) + { + return AMQType.BOOLEAN.asTypedValue(val); + } + else if(klass == byte[].class) + { + return AMQType.BINARY.asTypedValue(val); + } + else if(klass == BigDecimal.class) + { + return AMQType.DECIMAL.asTypedValue(val); + } + else if(val instanceof Map) + { + return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val)); + } + return null; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index ed01c91804..9b2f9b3969 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -828,6 +828,7 @@ public class FieldTable recalculateEncodedSize(); } + public static interface FieldTableElementProcessor { public boolean processElement(String propertyName, AMQTypedValue value); @@ -904,10 +905,13 @@ public class FieldTable } } + public Object get(String key) + { + return get(new AMQShortString(key)); + } public Object get(AMQShortString key) { - return getObject(key); } @@ -1184,4 +1188,24 @@ public class FieldTable return _properties.equals(f._properties); } + + public static FieldTable convertToFieldTable(Map<String, Object> map) + { + if (map != null) + { + FieldTable table = new FieldTable(); + for(Map.Entry<String,Object> entry : map.entrySet()) + { + table.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + + return table; + } + else + { + return null; + } + } + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index 0a1cedc4e6..7544d9b7e7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -23,10 +23,14 @@ package org.apache.qpid.framing.abstraction; import org.apache.qpid.framing.AMQBody; +import java.nio.ByteBuffer; + public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter { AMQBody convertToBody(ContentChunk contentBody); ContentChunk convertToContentChunk(AMQBody body); void configure(); + + AMQBody convertToBody(ByteBuffer buf); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index a5c5e5f22d..1c4a29b106 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -72,6 +72,11 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot } + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) { final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index 1b0be2b9cc..c87820b9b2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -80,6 +80,11 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; } + + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 5bfc189b02..31953ea6ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -28,38 +28,34 @@ import org.apache.qpid.transport.Receiver; /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received - * decodes it and then process the result. - */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> -{ - // Sets the network driver providing data for this ProtocolEngine + * decodes it and then process the result. + */ +public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> +{ + // Sets the network driver providing data for this ProtocolEngine void setNetworkDriver (NetworkDriver driver); - - // Returns the remote address of the NetworkDriver + + // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); - // Returns the local address of the NetworkDriver + // Returns the local address of the NetworkDriver SocketAddress getLocalAddress(); - - // Returns number of bytes written + + // Returns number of bytes written long getWrittenBytes(); - - // Returns number of bytes read + + // Returns number of bytes read long getReadBytes(); - - // Called by the NetworkDriver when the socket has been closed for reading + + // Called by the NetworkDriver when the socket has been closed for reading void closed(); - - // Called when the NetworkEngine has not written data for the specified period of time (will trigger a - // heartbeat) + + // Called when the NetworkEngine has not written data for the specified period of time (will trigger a + // heartbeat) void writerIdle(); - - // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) + + // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); - - /** - * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and - * passes the data onto the NetworkDriver for sending - */ - void writeFrame(AMQDataBlock frame); -}
\ No newline at end of file + + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1cdd1da72b..3403b591f3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -55,7 +55,7 @@ public class Connection extends ConnectionInvoker private static final Logger log = Logger.get(Connection.class); - enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } class DefaultConnectionListener implements ConnectionListener { @@ -84,7 +84,8 @@ public class Connection extends ConnectionInvoker private SaslServer saslServer; private SaslClient saslClient; private long idleTimeout = 0; - + private String _authorizationID; + // want to make this final private int _connectionId; @@ -118,7 +119,7 @@ public class Connection extends ConnectionInvoker sender.setIdleTimeout(idleTimeout); } - void setState(State state) + protected void setState(State state) { synchronized (lock) { @@ -315,7 +316,14 @@ public class Connection extends ConnectionInvoker public void dispatch(Method method) { Session ssn = getSession(method.getChannel()); - ssn.received(method); + if(ssn != null) + { + ssn.received(method); + } + else + { + // TODO + } } public int getChannelMax() @@ -525,7 +533,17 @@ public class Connection extends ConnectionInvoker { return idleTimeout; } - + + public void setAuthorizationID(String authorizationID) + { + _authorizationID = authorizationID; + } + + public String getAuthorizationID() + { + return _authorizationID; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 611c742fb1..3c80180d0b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -35,6 +35,7 @@ import static org.apache.qpid.transport.util.Functions.*; public abstract class Method extends Struct implements ProtocolEvent { + public static final Method create(int type) { // XXX: should generate separate factories for separate @@ -43,12 +44,18 @@ public abstract class Method extends Struct implements ProtocolEvent } // XXX: command subclass? + public static interface CompletionListener + { + public void onComplete(Method method); + } + private int id; private int channel; private boolean idSet = false; private boolean sync = false; private boolean batch = false; private boolean unreliable = false; + private CompletionListener completionListener; public final int getId() { @@ -61,6 +68,11 @@ public abstract class Method extends Struct implements ProtocolEvent this.idSet = true; } + boolean idSet() + { + return idSet; + } + public final int getChannel() { return channel; @@ -76,7 +88,7 @@ public abstract class Method extends Struct implements ProtocolEvent return sync; } - final void setSync(boolean value) + public final void setSync(boolean value) { this.sync = value; } @@ -152,6 +164,26 @@ public abstract class Method extends Struct implements ProtocolEvent } } + + public void setCompletionListener(CompletionListener completionListener) + { + this.completionListener = completionListener; + } + + public void complete() + { + if(completionListener!= null) + { + completionListener.onComplete(this); + completionListener = null; + } + } + + public boolean hasCompletionListener() + { + return completionListener != null; + } + public String toString() { StringBuilder str = new StringBuilder(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index 9b2744ee8b..3850dc162b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -52,6 +52,11 @@ public final class RangeSet implements Iterable<Range> return ranges.getFirst(); } + public Range getLast() + { + return ranges.getLast(); + } + public boolean includes(Range range) { for (Range r : this) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 2833565afc..453921ea2b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -52,13 +52,28 @@ public class ServerDelegate extends ConnectionDelegate { private SaslServer saslServer; + private List<Object> _locales; + private List<Object> _mechanisms; + private Map<String, Object> _clientProperties; + + + public ServerDelegate() + { + this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + } + + protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales) + { + _clientProperties = clientProperties; + _mechanisms = mechanisms; + _locales = locales; + } public void init(Connection conn, ProtocolHeader hdr) { conn.send(new ProtocolHeader(1, 0, 10)); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); + + conn.connectionStart(_clientProperties, _mechanisms, _locales); } @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) @@ -77,8 +92,8 @@ public class ServerDelegate extends ConnectionDelegate try { - SaslServer ss = Sasl.createSaslServer - (mechanism, "AMQP", "localhost", null, null); + + SaslServer ss = createSaslServer(mechanism); if (ss == null) { conn.connectionClose @@ -95,6 +110,14 @@ public class ServerDelegate extends ConnectionDelegate } } + protected SaslServer createSaslServer(String mechanism) + throws SaslException + { + SaslServer ss = Sasl.createSaslServer + (mechanism, "AMQP", "localhost", null, null); + return ss; + } + private void secure(Connection conn, byte[] response) { SaslServer ss = conn.getSaslServer(); @@ -108,6 +131,7 @@ public class ServerDelegate extends ConnectionDelegate (Integer.MAX_VALUE, org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, Integer.MAX_VALUE); + conn.setAuthorizationID(ss.getAuthorizationID()); } else { @@ -133,9 +157,16 @@ public class ServerDelegate extends ConnectionDelegate @Override public void connectionOpen(Connection conn, ConnectionOpen open) { conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.setState(OPEN); } + protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc) + { + return new Session(conn, delegate, new Binary(atc.getName()), 0); + } + + public Session getSession(Connection conn, SessionAttach atc) { return new Session(conn, new Binary(atc.getName()), 0); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 3dca4fc44e..818bb19c08 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -81,7 +81,7 @@ public class Session extends SessionInvoker private Binary name; private long expiry; private int channel; - private SessionDelegate delegate = new SessionDelegate(); + private SessionDelegate delegate; private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; @@ -111,9 +111,15 @@ public class Session extends SessionInvoker private Thread resumer = null; - Session(Connection connection, Binary name, long expiry) + protected Session(Connection connection, Binary name, long expiry) + { + this(connection, new SessionDelegate(), name, expiry); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this.connection = connection; + this.delegate = delegate; this.name = name; this.expiry = expiry; initReceiver(); @@ -440,7 +446,7 @@ public class Session extends SessionInvoker } } - boolean complete(int lower, int upper) + protected boolean complete(int lower, int upper) { //avoid autoboxing if(log.isDebugEnabled()) @@ -457,8 +463,9 @@ public class Session extends SessionInvoker if (m != null) { commandBytes -= m.getBodySize(); + m.complete(); + commands[idx] = null; } - commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -486,13 +493,28 @@ public class Session extends SessionInvoker } } - final private boolean isFull(int id) + protected boolean isFull(int id) + { + return isCommandsFull(id) || isBytesFull(); + } + + protected boolean isBytesFull() { - return id - maxComplete >= commands.length || commandBytes >= byteLimit; + return commandBytes >= byteLimit; + } + + protected boolean isCommandsFull(int id) + { + return id - maxComplete >= commands.length; } public void invoke(Method m) { + invoke(m,(Runnable)null); + } + + public void invoke(Method m, Runnable postIdSettingAction) + { if (m.getEncodedTrack() == Frame.L4) { if (m.hasPayload()) @@ -553,8 +575,13 @@ public class Session extends SessionInvoker "(state=%s)", state)); } - int next = commandsOut++; + int next; + next = commandsOut++; m.setId(next); + if(postIdSettingAction != null) + { + postIdSettingAction.run(); + } if (isFull(next)) { @@ -607,7 +634,7 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - if (expiry > 0 && !m.isUnreliable()) + if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); @@ -617,6 +644,7 @@ public class Session extends SessionInvoker m.setSync(true); } needSync = !m.isSync(); + try { send(m); @@ -641,7 +669,7 @@ public class Session extends SessionInvoker // flush every 64K commands to avoid ambiguity on // wraparound - if ((next % 65536) == 0) + if (shouldIssueFlush(next)) { try { @@ -669,6 +697,11 @@ public class Session extends SessionInvoker } } + protected boolean shouldIssueFlush(int next) + { + return (next % 65536) == 0; + } + public void sync() { sync(timeout); @@ -910,6 +943,14 @@ public class Session extends SessionInvoker } } } + if(state == CLOSED) + { + delegate.closed(this); + } + else + { + delegate.detached(this); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index c8d0855607..6146f029b2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -184,4 +184,11 @@ public class SessionDelegate } } + public void closed(Session session) + { + } + + public void detached(Session session) + { + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 33d552b91e..357caa26e1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -186,8 +186,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate case COMMAND: int commandType = dec.readUint16(); // read in the session header, right now we don't use it - dec.readUint16(); + int hdr = dec.readUint16(); command = Method.create(commandType); + command.setSync((0x0001 & hdr) != 0); command.read(dec); if (command.hasPayload()) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 408c95e075..2132fc2c03 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -39,7 +39,7 @@ import static org.apache.qpid.transport.network.InputHandler.State.*; * @author Rafael H. Schloming */ -public final class InputHandler implements Receiver<ByteBuffer> +public class InputHandler implements Receiver<ByteBuffer> { public enum State diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 6144edb947..ea48e48721 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -137,6 +137,7 @@ final class IoReceiver implements Runnable } catch (Throwable t) { + t.printStackTrace(); if (!(shutdownBroken && t instanceof SocketException && t.getMessage().equalsIgnoreCase("socket closed") && diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index b0d1c46572..3838bf76be 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -229,7 +229,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver if (_socketConnector instanceof SocketConnector) { ((SocketConnector) _socketConnector).setWorkerTimeout(0); - } + } ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); future.join(); @@ -279,7 +279,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void send(ByteBuffer msg) { - _lastWriteFuture = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg)); + org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); + minaBuf.put(msg); + minaBuf.flip(); + _lastWriteFuture = _ioSession.write(minaBuf); } public void setIdleTimeout(long l) |
