From 2c5100e6829529ea0df4463c5d914d613e45c1c8 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 5 Aug 2008 19:33:11 +0000 Subject: Profiling driven changes: - made AMQShortString cache the toString() value - added static initializer to IoTransport to disable use of pooled byte buffers - modified IoSender to permit buffering - removed OutputHandler and eliminated intermediate Frame generation between Disassembler and Sender (IoSender) - made Disassembler take advantage of IoSender's buffering - removed Header and Data as distinct protocol events, added Header and Body members to MessageTransfer - modified Assembler and Disassembler to decode/encode Header and Data directly to/from MessageTransfer - modified Disassembler to only write data if encoding of headers is successful - added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding that is also fast for 7-bit ascii - modified JMSTextMessage to use the Strings.toUTF8 - modified QpidBench to only generate 7-bit ascii when using TextMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682887 13f79535-47bb-0310-9956-ffa450edef68 --- java/common/Composite.tpl | 53 +++++++- java/common/Invoker.tpl | 7 +- java/common/genutil.py | 9 +- .../src/main/java/org/apache/qpid/ToyBroker.java | 129 +++---------------- .../src/main/java/org/apache/qpid/ToyClient.java | 29 ++--- .../src/main/java/org/apache/qpid/ToyExchange.java | 37 +++--- .../org/apache/qpid/framing/AMQShortString.java | 8 +- .../java/org/apache/qpid/transport/Channel.java | 67 +--------- .../main/java/org/apache/qpid/transport/Data.java | 98 -------------- .../main/java/org/apache/qpid/transport/Echo.java | 16 --- .../java/org/apache/qpid/transport/Header.java | 48 +------ .../java/org/apache/qpid/transport/Method.java | 39 ++++++ .../apache/qpid/transport/ProtocolDelegate.java | 4 - .../java/org/apache/qpid/transport/Session.java | 67 ++++------ .../org/apache/qpid/transport/SessionDelegate.java | 4 - .../org/apache/qpid/transport/codec/BBEncoder.java | 17 ++- .../org/apache/qpid/transport/codec/Validator.java | 2 +- .../apache/qpid/transport/network/Assembler.java | 63 +++++---- .../qpid/transport/network/Disassembler.java | 143 ++++++++++++--------- .../qpid/transport/network/OutputHandler.java | 125 ------------------ .../apache/qpid/transport/network/io/IoSender.java | 48 +++---- .../qpid/transport/network/io/IoTransport.java | 12 +- .../qpid/transport/network/mina/MinaHandler.java | 3 +- .../qpid/transport/network/nio/NioHandler.java | 4 +- .../main/java/org/apache/qpid/util/Strings.java | 82 ++++++++++++ 25 files changed, 433 insertions(+), 681 deletions(-) delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/Data.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/Strings.java (limited to 'java/common') diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl index 37e3bf8853..283fa24641 100644 --- a/java/common/Composite.tpl +++ b/java/common/Composite.tpl @@ -1,5 +1,6 @@ package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -9,7 +10,6 @@ import java.util.UUID; import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encodable; import org.apache.qpid.transport.codec.Encoder; -import org.apache.qpid.transport.codec.Validator; import org.apache.qpid.transport.network.Frame; @@ -18,11 +18,13 @@ from genutil import * cls = klass(type)["@name"] +segments = type["segments"] + if type.name in ("control", "command"): base = "Method" size = 0 pack = 2 - if type["segments"]: + if segments: payload = "true" else: payload = "false" @@ -86,6 +88,10 @@ options = get_options(fields) for f in fields: if not f.empty: out(" private $(f.type) $(f.name);\n") + +if segments: + out(" private Header header;\n") + out(" private ByteBuffer body;\n") } ${ @@ -99,6 +105,10 @@ for f in fields: if f.option: continue out(" $(f.set)($(f.name));\n") +if segments: + out(" setHeader(header);\n") + out(" setBody(body);\n") + if options or base == "Method": out(""" for (int i=0; i < _options.length; i++) { @@ -154,7 +164,6 @@ else: } public final $name $(f.set)($(f.type) value) { - $(f.check) ${ if not f.empty: out(" this.$(f.name) = value;") @@ -171,6 +180,44 @@ if pack > 0: return $(f.set)(value); } """) +} + +${ +if segments: + out(""" public final Header getHeader() { + return this.header; + } + + public final void setHeader(Header header) { + this.header = header; + } + + public final $name header(Header header) { + setHeader(header); + return this; + } + + public final ByteBuffer getBody() { + if (this.body == null) + { + return null; + } + else + { + return this.body.slice(); + } + } + + public final void setBody(ByteBuffer body) { + this.body = body; + } + + public final $name body(ByteBuffer body) + { + setBody(body); + return this; + } +""") } public void write(Encoder enc) diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl index 21a17624a6..9158922df7 100644 --- a/java/common/Invoker.tpl +++ b/java/common/Invoker.tpl @@ -1,5 +1,6 @@ package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; @@ -32,9 +33,9 @@ for c in composites: jclass = "" out(""" - public final $jresult $(dromedary(name))($(", ".join(params))) { - $(jreturn)invoke(new $name($(", ".join(args)))$jclass); - } + public final $jresult $(dromedary(name))($(", ".join(params))) { + $(jreturn)invoke(new $name($(", ".join(args)))$jclass); + } """) } diff --git a/java/common/genutil.py b/java/common/genutil.py index 2f1caa41c4..f8f234548c 100644 --- a/java/common/genutil.py +++ b/java/common/genutil.py @@ -170,18 +170,15 @@ class Field: if self.type_node.name == "struct": self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname) self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name) - self.check = "" self.coder = "Struct" elif self.type_node.name == "domain": self.coder = camel(0, self.prim_type["@name"]) self.read = "%s.get(dec.read%s())" % (tname, self.coder) self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name) - self.check = "" else: self.coder = camel(0, self.type_node["@name"]) self.read = "dec.read%s()" % self.coder self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name) - self.check = "Validator.check%s(value);" % self.coder self.type = jtype(self.type_node) self.default = DEFAULTS.get(self.type, "null") self.has = camel(1, "has", self.name) @@ -214,6 +211,9 @@ def get_parameters(type, fields): options = True else: params.append("%s %s" % (f.type, f.name)) + if type["segments"]: + params.append("Header header") + params.append("ByteBuffer body") if options or type.name in ("control", "command"): params.append("Option ... _options") return params @@ -226,6 +226,9 @@ def get_arguments(type, fields): options = True else: args.append(f.name) + if type["segments"]: + args.append("header") + args.append("body") if options or type.name in ("control", "command"): args.append("_options") return args diff --git a/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java index 56286a9b01..83d434b20a 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -45,10 +45,6 @@ class ToyBroker extends SessionDelegate { private ToyExchange exchange; - private MessageTransfer xfr = null; - private DeliveryProperties props = null; - private Header header = null; - private List body = null; private Map consumers = new ConcurrentHashMap(); public ToyBroker(ToyExchange exchange) @@ -103,22 +99,10 @@ class ToyBroker extends SessionDelegate @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - this.xfr = xfr; - body = new ArrayList(); - System.out.println("received transfer " + xfr.getDestination()); - } - - @Override public void header(Session ssn, Header header) - { - if (xfr == null || body == null) - { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, - "no method segment"); - ssn.close(); - return; - } - - props = header.get(DeliveryProperties.class); + String dest = xfr.getDestination(); + System.out.println("received transfer " + dest); + Header header = xfr.getHeader(); + DeliveryProperties props = header.get(DeliveryProperties.class); if (props != null) { System.out.println("received headers routing_key " + props.getRoutingKey()); @@ -130,67 +114,31 @@ class ToyBroker extends SessionDelegate System.out.println(mp.getApplicationHeaders()); } - this.header = header; - } - - @Override public void data(Session ssn, Data data) - { - if (xfr == null || body == null) + if (exchange.route(dest,props.getRoutingKey(),xfr)) { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); - ssn.close(); - return; + System.out.println("queued " + xfr); + dispatchMessages(ssn); } - - body.add(data); - - if (data.isLast()) + else { - String dest = xfr.getDestination(); - Message m = new Message(header, body); - if (exchange.route(dest,props.getRoutingKey(),m)) + if (props == null || !props.getDiscardUnroutable()) { - System.out.println("queued " + m); - dispatchMessages(ssn); + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } - else - { - - reject(ssn); - } - ssn.processed(xfr); - xfr = null; - body = null; - } - } - - private void reject(Session ssn) - { - if (props != null && props.getDiscardUnroutable()) - { - return; - } - else - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); } + ssn.processed(xfr); } - private void transferMessageToPeer(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(m.header); - for (Data d : m.body) - { - ssn.data(d.getData()); - } - ssn.endData(); + ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + m.getHeader(), m.getBody()); } private void dispatchMessages(Session ssn) @@ -204,8 +152,8 @@ class ToyBroker extends SessionDelegate private void checkAndSendMessagesToConsumer(Session ssn,String dest) { Consumer c = consumers.get(dest); - LinkedBlockingQueue queue = exchange.getQueue(c._queueName); - Message m = queue.poll(); + LinkedBlockingQueue queue = exchange.getQueue(c._queueName); + MessageTransfer m = queue.poll(); while (m != null && c._credit>0) { transferMessageToPeer(ssn,dest,m); @@ -214,43 +162,6 @@ class ToyBroker extends SessionDelegate } } - class Message - { - private final Header header; - private final List body; - - public Message(Header header, List body) - { - this.header = header; - this.body = body; - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - - if (header != null) - { - boolean first = true; - for (Struct st : header.getStructs()) - { - if (first) { first = false; } - else { sb.append(" "); } - sb.append(st); - } - } - - for (Data d : body) - { - sb.append(" | "); - sb.append(d); - } - - return sb.toString(); - } - - } - // ugly, but who cares :) // assumes unit is always no of messages, not bytes // assumes it's credit mode and not window diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java index 27a48fb760..cb10859c9f 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -20,6 +20,7 @@ */ package org.apache.qpid; +import java.nio.*; import java.util.*; import org.apache.qpid.transport.*; @@ -47,17 +48,9 @@ class ToyClient extends SessionDelegate } } - @Override public void header(Session ssn, Header header) + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - for (Struct st : header.getStructs()) - { - System.out.println("header: " + st); - } - } - - @Override public void data(Session ssn, Data data) - { - System.out.println("got data: " + data); + System.out.println("msg: " + xfr); } public static final void main(String[] args) @@ -111,16 +104,16 @@ class ToyClient extends SessionDelegate map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(new DeliveryProperties(), - new MessageProperties().setApplicationHeaders(map)); - ssn.data("this is the data"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties(), + new MessageProperties() + .setApplicationHeaders(map)), + "this is the data"); ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.data("this should be rejected"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + null, + "this should be rejected"); ssn.sync(); Future future = ssn.queueQuery("asdf"); diff --git a/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/java/common/src/main/java/org/apache/qpid/ToyExchange.java index 5c3c0ac0fb..c638679596 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpid/ToyExchange.java @@ -9,42 +9,43 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.qpid.ToyBroker.Message; +import org.apache.qpid.transport.MessageTransfer; + public class ToyExchange { final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map>> directEx = new HashMap>>(); - private Map>> topicEx = new HashMap>>(); - private Map> queues = new HashMap>(); + private Map>> directEx = new HashMap>>(); + private Map>> topicEx = new HashMap>>(); + private Map> queues = new HashMap>(); public void createQueue(String name) { - queues.put(name, new LinkedBlockingQueue()); + queues.put(name, new LinkedBlockingQueue()); } - public LinkedBlockingQueue getQueue(String name) + public LinkedBlockingQueue getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - LinkedBlockingQueue queue = queues.get(queueName); + LinkedBlockingQueue queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List> list = directEx.get(binding); + List> list = directEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); directEx.put(binding,list); } @@ -53,21 +54,21 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List> list = topicEx.get(binding); + List> list = topicEx.get(binding); list.add(queue); } else { - List> list = new LinkedList>(); + List> list = new LinkedList>(); list.add(queue); topicEx.put(binding,list); } } } - public boolean route(String dest,String routingKey,Message msg) + public boolean route(String dest, String routingKey, MessageTransfer msg) { - List> queues; + List> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +102,9 @@ public class ToyExchange } } - private List> matchWildCard(String routingKey) + private List> matchWildCard(String routingKey) { - List> selected = new ArrayList>(); + List> selected = new ArrayList>(); for(String key: topicEx.keySet()) { @@ -111,7 +112,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(LinkedBlockingQueue queue : topicEx.get(key)) + for(LinkedBlockingQueue queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +122,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List> selected) + private void storeMessage(MessageTransfer msg,List> selected) { - for(LinkedBlockingQueue queue : selected) + for(LinkedBlockingQueue queue : selected) { queue.offer(msg); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 2a248bf703..22f66ae556 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -418,9 +418,15 @@ public final class AMQShortString implements CharSequence, Comparable void delegate(C context, ProtocolDelegate delegate) - { - delegate.data(context, this); - } - - public String toString() - { - StringBuffer str = new StringBuffer(); - str.append("ch="); - str.append(" "); - str.append("Data("); - str.append(str(data, 64)); - str.append(")"); - return str.toString(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index f8debcf923..87bdae3866 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -40,22 +40,6 @@ public class Echo extends SessionDelegate { this.xfr = xfr; ssn.invoke(xfr); - } - - public void header(Session ssn, Header hdr) - { - ssn.header(hdr); - } - - public void data(Session ssn, Data data) - { - ssn.data(data.getData()); - if (data.isLast()) - { - ssn.endData(); - } - - // XXX: should be able to get command-id from any segment ssn.processed(xfr); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Header.java b/java/common/src/main/java/org/apache/qpid/transport/Header.java index 3b351ee828..9b6ab4951b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Header.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -22,6 +22,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; +import java.util.Arrays; import java.util.List; import java.nio.ByteBuffer; @@ -32,33 +33,25 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -public class Header implements ProtocolEvent { +public class Header { private final List structs; - private ByteBuffer _buf; - private boolean _noPayload; - private int channel; - public Header(List structs, boolean lastframe) + public Header(List structs) { this.structs = structs; - _noPayload= lastframe; } - public List getStructs() + public Header(Struct ... structs) { - return structs; + this(Arrays.asList(structs)); } - public void setBuf(ByteBuffer buf) + public List getStructs() { - _buf = buf; + return structs; } - public ByteBuffer getBuf() - { - return _buf; - } public T get(Class klass) { for (Struct st : structs) @@ -72,36 +65,9 @@ public class Header implements ProtocolEvent { return null; } - public final int getChannel() - { - return channel; - } - - public final void setChannel(int channel) - { - this.channel = channel; - } - - public byte getEncodedTrack() - { - return Frame.L4; - } - - public void delegate(C context, ProtocolDelegate delegate) - { - delegate.header(context, this); - } - - public boolean hasNoPayload() - { - return _noPayload; - } - public String toString() { StringBuffer str = new StringBuffer(); - str.append("ch="); - str.append(channel); str.append(" Header("); boolean first = true; for (Struct s : structs) diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java index 1c80d8c00c..6b99f6d5d3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -22,6 +22,10 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; +import java.nio.ByteBuffer; + +import static org.apache.qpid.transport.util.Functions.*; + /** * Method * @@ -88,6 +92,26 @@ public abstract class Method extends Struct implements ProtocolEvent public abstract boolean hasPayload(); + public Header getHeader() + { + return null; + } + + public void setHeader(Header header) + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer getBody() + { + return null; + } + + public void setBody(ByteBuffer body) + { + throw new UnsupportedOperationException(); + } + public abstract byte getEncodedTrack(); public abstract void dispatch(C context, MethodDelegate delegate); @@ -134,6 +158,21 @@ public abstract class Method extends Struct implements ProtocolEvent str.append(" "); str.append(super.toString()); + Header hdr = getHeader(); + if (hdr != null) + { + for (Struct st : hdr.getStructs()) + { + str.append("\n "); + str.append(st); + } + } + ByteBuffer body = getBody(); + if (body != null) + { + str.append("\n body="); + str.append(str(body, 64)); + } return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java index 9fa28fbe23..a90948fc1d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java @@ -35,10 +35,6 @@ public interface ProtocolDelegate void command(C context, Method command); - void header(C context, Header header); - - void data(C context, Data data); - void error(C context, ProtocolError error); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 8ec13c0ee7..1400bd2e5b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.transport.Option.*; import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.util.Serial.*; +import static org.apache.qpid.util.Strings.*; /** * Session @@ -271,7 +272,7 @@ public class Session extends Invoker } needSync = !m.isSync(); channel.method(m); - if (autoSync && !m.hasPayload()) + if (autoSync) { sync(); } @@ -290,50 +291,6 @@ public class Session extends Invoker } } - public void header(Header header) - { - channel.header(header); - } - - public Header header(List structs) - { - Header res = new Header(structs, false); - header(res); - return res; - } - - public Header header(Struct ... structs) - { - return header(Arrays.asList(structs)); - } - - public void data(ByteBuffer buf) - { - channel.data(buf); - } - - public void data(String str) - { - channel.data(str); - } - - public void data(byte[] bytes) - { - channel.data(bytes); - } - - public void endData() - { - channel.end(); - synchronized (commands) - { - if (autoSync) - { - sync(); - } - } - } - public void sync() { sync(timeout); @@ -501,6 +458,26 @@ public class Session extends Invoker } + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + ByteBuffer.wrap(body), _options); + } + + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + toUTF8(body), _options); + } + public void close() { sessionRequestTimeout(0); diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index dc400d3098..b91763509c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -48,10 +48,6 @@ public abstract class SessionDelegate } } - public void header(Session ssn, Header header) { } - - public void data(Session ssn, Data data) { } - public void error(Session ssn, ProtocolError error) { } @Override public void executionResult(Session ssn, ExecutionResult result) diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index 788b6a55e3..390de881ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -35,24 +35,29 @@ public final class BBEncoder extends AbstractEncoder { private ByteBuffer out; + private int segment; public BBEncoder(int capacity) { out = ByteBuffer.allocate(capacity); out.order(ByteOrder.BIG_ENDIAN); + segment = 0; } public void init() { out.clear(); + segment = 0; } - public ByteBuffer done() + public ByteBuffer segment() { - out.flip(); - ByteBuffer encoded = ByteBuffer.allocate(out.remaining()); - encoded.put(out); - encoded.flip(); - return encoded; + int pos = out.position(); + out.position(segment); + ByteBuffer slice = out.slice(); + slice.limit(pos - segment); + out.position(pos); + segment = pos; + return slice; } private void grow(int size) diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java index ae12d35209..c1d30eacc3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java @@ -154,7 +154,7 @@ public class Validator public static final void checkMap(Map map) { - if (map == null) + if (map == null || map.isEmpty()) { return; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 2c09776c3d..b808156dc6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.Decoder; -import org.apache.qpid.transport.Data; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; @@ -51,6 +50,7 @@ public class Assembler implements Receiver, NetworkDelegate private final Receiver receiver; private final Map> segments; + private final Method[] incomplete; private final ThreadLocal decoder = new ThreadLocal() { public BBDecoder initialValue() @@ -63,6 +63,7 @@ public class Assembler implements Receiver, NetworkDelegate { this.receiver = receiver; segments = new HashMap>(); + incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -97,11 +98,6 @@ public class Assembler implements Receiver, NetworkDelegate receiver.received(event); } - private void emit(Frame frame, ProtocolEvent event) - { - emit(frame.getChannel(), event); - } - public void received(NetworkEvent event) { event.delegate(this); @@ -122,32 +118,18 @@ public class Assembler implements Receiver, NetworkDelegate emit(0, header); } - public void frame(Frame frame) - { - switch (frame.getType()) - { - case BODY: - emit(frame, new Data(frame.getBody(), frame.isFirstFrame(), - frame.isLastFrame())); - break; - default: - assemble(frame); - break; - } - } - public void error(ProtocolError error) { emit(0, error); } - private void assemble(Frame frame) + public void frame(Frame frame) { ByteBuffer segment; if (frame.isFirstFrame() && frame.isLastFrame()) { segment = frame.getBody(); - emit(frame, decode(frame, segment)); + assemble(frame, segment); } else { @@ -179,38 +161,63 @@ public class Assembler implements Receiver, NetworkDelegate segment.put(f.getBody()); } segment.flip(); - emit(frame, decode(frame, segment)); + assemble(frame, segment); } } } - private ProtocolEvent decode(Frame frame, ByteBuffer segment) + private void assemble(Frame frame, ByteBuffer segment) { BBDecoder dec = decoder.get(); dec.init(segment); + int channel = frame.getChannel(); + Method command; + switch (frame.getType()) { case CONTROL: int controlType = dec.readUint16(); Method control = Method.create(controlType); control.read(dec); - return control; + emit(channel, control); + break; case COMMAND: int commandType = dec.readUint16(); // read in the session header, right now we don't use it dec.readUint16(); - Method command = Method.create(commandType); + command = Method.create(commandType); command.read(dec); - return command; + if (command.hasPayload()) + { + incomplete[channel] = command; + } + else + { + emit(channel, command); + } + break; case HEADER: + command = incomplete[channel]; List structs = new ArrayList(); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); } - return new Header(structs, frame.isLastFrame() && frame.isLastSegment()); + command.setHeader(new Header(structs)); + if (frame.isLastSegment()) + { + incomplete[channel] = null; + emit(channel, command); + } + break; + case BODY: + command = incomplete[channel]; + command.setBody(segment); + incomplete[channel] = null; + emit(channel, command); + break; default: throw new IllegalStateException("unknown frame type: " + frame.getType()); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 1ed446af2f..444c7d3f14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network; import org.apache.qpid.transport.codec.BBEncoder; -import org.apache.qpid.transport.Data; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -34,7 +33,8 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import java.nio.ByteBuffer; -import java.util.Iterator; +import java.nio.ByteOrder; +import java.util.List; import static org.apache.qpid.transport.network.Frame.*; @@ -46,12 +46,14 @@ import static java.lang.Math.*; * */ -public class Disassembler implements Sender, - ProtocolDelegate +public final class Disassembler implements Sender, + ProtocolDelegate { - private final Sender sender; + private final Sender sender; private final int maxPayload; + private final ByteBuffer header; + private final Object sendlock = new Object(); private final ThreadLocal encoder = new ThreadLocal() { public BBEncoder initialValue() @@ -60,7 +62,7 @@ public class Disassembler implements Sender, } }; - public Disassembler(Sender sender, int maxFrame) + public Disassembler(Sender sender, int maxFrame) { if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { @@ -69,6 +71,8 @@ public class Disassembler implements Sender, } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; + this.header = ByteBuffer.allocate(HEADER_SIZE); + this.header.order(ByteOrder.BIG_ENDIAN); } @@ -79,60 +83,80 @@ public class Disassembler implements Sender, public void flush() { - sender.flush(); + synchronized (sendlock) + { + sender.flush(); + } } public void close() { - sender.close(); + synchronized (sendlock) + { + sender.close(); + } + } + + private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + synchronized (sendlock) + { + header.put(0, flags); + header.put(1, type); + header.putShort(2, (short) (size + HEADER_SIZE)); + header.put(5, track); + header.putShort(6, (short) channel); + + header.rewind(); + + sender.send(header); + + int limit = buf.limit(); + buf.limit(buf.position() + size); + sender.send(buf); + buf.limit(limit); + } } private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf, boolean first, boolean last) { + byte typeb = (byte) type.getValue(); byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; - if(!buf.hasRemaining()) + int remaining = buf.remaining(); + while (true) { - //empty data - byte nflags = flags; + int size = min(maxPayload, remaining); + remaining -= size; + + byte newflags = flags; if (first) { - nflags |= FIRST_FRAME; + newflags |= FIRST_FRAME; first = false; } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice()); - sender.send(frame); - } - else - { - while (buf.hasRemaining()) + if (last && remaining == 0) { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) - { - newflags |= FIRST_FRAME; - first = false; - } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, track, event.getChannel(), slice); - sender.send(frame); + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; } } } public void init(Void v, ProtocolHeader header) { - sender.send(header); + synchronized (sendlock) + { + sender.send(header.toByteBuffer()); + sender.flush(); + } } public void control(Void v, Method method) @@ -170,48 +194,43 @@ public class Disassembler implements Sender, } } method.write(enc); - ByteBuffer buf = enc.done(); + ByteBuffer methodSeg = enc.segment(); byte flags = FIRST_SEG; - if (!method.hasPayload()) + boolean payload = method.hasPayload(); + if (!payload) { flags |= LAST_SEG; } - fragment(flags, type, method, buf, true, true); - } - - public void header(Void v, Header header) - { - ByteBuffer buf; - if (header.getBuf() == null) + ByteBuffer headerSeg = null; + if (payload) { - BBEncoder enc = encoder.get(); - enc.init(); - for (Struct st : header.getStructs()) + final Header hdr = method.getHeader(); + final List structs = hdr.getStructs(); + final int nstructs = structs.size(); + for (int i = 0; i < nstructs; i++) { - enc.writeStruct32(st); + enc.writeStruct32(structs.get(i)); } - buf = enc.done(); - header.setBuf(buf); + headerSeg = enc.segment(); } - else + + synchronized (sendlock) { - buf = header.getBuf(); - buf.flip(); + fragment(flags, type, method, methodSeg, true, true); + if (payload) + { + fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg, true, true); + fragment(LAST_SEG, SegmentType.BODY, method, method.getBody(), true, true); + } } - fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true); - } - - public void data(Void v, Data data) - { - fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast()); } public void error(Void v, ProtocolError error) { - sender.send(error); + throw new IllegalArgumentException("" + error); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java deleted file mode 100644 index b3f400a6e7..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Sender; - -import static org.apache.qpid.transport.network.Frame.*; - - -/** - * OutputHandler - * - */ - -public class OutputHandler implements Sender, NetworkDelegate -{ - - private Sender sender; - private Object lock = new Object(); - private int bytes = 0; - private List frames = new ArrayList(); - - public OutputHandler(Sender sender) - { - this.sender = sender; - } - - public void send(NetworkEvent event) - { - event.delegate(this); - } - - public void close() - { - synchronized (lock) - { - sender.close(); - } - } - - public void init(ProtocolHeader header) - { - synchronized (lock) - { - sender.send(header.toByteBuffer()); - sender.flush(); - } - } - - public void frame(Frame frame) - { - synchronized (lock) - { - frames.add(frame); - bytes += HEADER_SIZE + frame.getSize(); - - if (bytes > 64*1024) - { - flush(); - } - } - } - - public void flush() - { - synchronized (lock) - { - ByteBuffer buf = ByteBuffer.allocate(bytes); - int nframes = frames.size(); - for (int i = 0; i < nframes; i++) - { - Frame frame = frames.get(i); - buf.put(frame.getFlags()); - buf.put((byte) frame.getType().getValue()); - buf.putShort((short) (frame.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(frame.getTrack()); - buf.putShort((short) frame.getChannel()); - // RESERVED - buf.putInt(0); - buf.put(frame.getBody()); - } - buf.flip(); - - frames.clear(); - bytes = 0; - - sender.send(buf); - sender.flush(); - } - } - - public void error(ProtocolError error) - { - throw new IllegalStateException("XXX"); - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 37910ade0d..7ac5649e99 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -48,8 +47,9 @@ final class IoSender extends Thread implements Sender private final OutputStream out; private final byte[] buffer; - private final AtomicInteger head = new AtomicInteger(START); - private final AtomicInteger tail = new AtomicInteger(START); + private volatile int head = START; + private volatile int tail = START; + private volatile boolean idle = true; private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -96,16 +96,17 @@ final class IoSender extends Thread implements Sender while (remaining > 0) { - final int hd = head.get(); - final int tl = tail.get(); + final int hd = head; + final int tl = tail; if (hd - tl >= size) { + flush(); synchronized (notFull) { long start = System.currentTimeMillis(); long elapsed = 0; - while (head.get() - tail.get() >= size && elapsed < timeout) + while (head - tail >= size && elapsed < timeout) { try { @@ -118,9 +119,9 @@ final class IoSender extends Thread implements Sender elapsed += System.currentTimeMillis() - start; } - if (head.get() - tail.get() >= size) + if (head - tail >= size) { - throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get())); + throw new TransportException(String.format("write timed out: %s, %s", head, tail)); } } continue; @@ -140,21 +141,20 @@ final class IoSender extends Thread implements Sender } buf.get(buffer, hd_idx, length); - head.getAndAdd(length); - if (hd == tail.get()) - { - synchronized (notEmpty) - { - notEmpty.notify(); - } - } + head += length; remaining -= length; } } public void flush() { - // pass + if (idle) + { + synchronized (notEmpty) + { + notEmpty.notify(); + } + } } public void close() @@ -206,8 +206,8 @@ final class IoSender extends Thread implements Sender while (true) { - final int hd = head.get(); - final int tl = tail.get(); + final int hd = head; + final int tl = tail; if (hd == tl) { @@ -216,9 +216,11 @@ final class IoSender extends Thread implements Sender break; } + idle = true; + synchronized (notEmpty) { - while (head.get() == tail.get() && !closed.get()) + while (head == tail && !closed.get()) { try { @@ -231,6 +233,8 @@ final class IoSender extends Thread implements Sender } } + idle = false; + continue; } @@ -258,8 +262,8 @@ final class IoSender extends Thread implements Sender close(false); break; } - tail.getAndAdd(length); - if (head.get() - tl >= size) + tail += length; + if (head - tl >= size) { synchronized (notFull) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 52accb6b97..3b543b3e60 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -33,7 +33,6 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; import org.apache.qpid.transport.util.Logger; /** @@ -48,6 +47,14 @@ import org.apache.qpid.transport.util.Logger; public final class IoTransport { + static + { + org.apache.mina.common.ByteBuffer.setAllocator + (new org.apache.mina.common.SimpleByteBufferAllocator()); + org.apache.mina.common.ByteBuffer.setUseDirectBuffers + (Boolean.getBoolean("amqj.enableDirectBuffers")); + } + private static final Logger log = Logger.get(IoTransport.class); private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; @@ -104,8 +111,7 @@ public final class IoTransport sender = new IoSender(this, 2*writeBufferSize, timeout); Connection conn = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); + (new Disassembler(sender, 64*1024 - 1), delegate); receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize, timeout); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index bcac7c4e16..16a1e20b10 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -44,7 +44,6 @@ import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; import static org.apache.qpid.transport.util.Functions.*; @@ -292,7 +291,7 @@ public class MinaHandler implements IoHandler { // XXX: hardcoded max-frame return new Connection - (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate); + (new Disassembler(sender, MAX_FRAME_SIZE), delegate); } public Receiver receiver(Connection conn) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index f0161efe97..51e41b26f7 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -17,7 +17,6 @@ import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.OutputHandler; public class NioHandler implements Runnable { @@ -68,8 +67,7 @@ public class NioHandler implements Runnable NioSender sender = new NioSender(_ch); Connection con = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); + (new Disassembler(sender, 64*1024 - 1), delegate); con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java new file mode 100644 index 0000000000..4b199bafe6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.UnsupportedEncodingException; + + +/** + * Strings + * + */ + +public final class Strings +{ + + private static final byte[] EMPTY = new byte[0]; + + private static final ThreadLocal charbuf = new ThreadLocal() + { + public char[] initialValue() + { + return new char[4096]; + } + }; + + public static final byte[] toUTF8(String str) + { + if (str == null) + { + return EMPTY; + } + else + { + final int size = str.length(); + char[] chars = charbuf.get(); + if (size > chars.length) + { + chars = new char[Math.max(size, 2*chars.length)]; + charbuf.set(chars); + } + + str.getChars(0, size, chars, 0); + final byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) + { + if (chars[i] > 127) + { + try + { + return str.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + bytes[i] = (byte) chars[i]; + } + return bytes; + } + } + +} -- cgit v1.2.1