diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-08-05 19:33:11 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-05 19:33:11 +0000 |
| commit | 2c5100e6829529ea0df4463c5d914d613e45c1c8 (patch) | |
| tree | c27e316d892edf5ac42348825a0ba2079f7f80a7 /java/client/src | |
| parent | b5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68 (diff) | |
| download | qpid-python-2c5100e6829529ea0df4463c5d914d613e45c1c8.tar.gz | |
Profiling driven changes:
- made AMQShortString cache the toString() value
- added static initializer to IoTransport to disable use of pooled
byte buffers
- modified IoSender to permit buffering
- removed OutputHandler and eliminated intermediate Frame generation
between Disassembler and Sender<ByteBuffer> (IoSender)
- made Disassembler take advantage of IoSender's buffering
- removed Header and Data as distinct protocol events, added Header
and Body members to MessageTransfer
- modified Assembler and Disassembler to decode/encode Header and
Data directly to/from MessageTransfer
- modified Disassembler to only write data if encoding of headers is
successful
- added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding
that is also fast for 7-bit ascii
- modified JMSTextMessage to use the Strings.toUTF8
- modified QpidBench to only generate 7-bit ascii when using
TextMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
7 files changed, 76 insertions, 278 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 4a43a7bba8..b29e39a52e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.util.Strings; public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { @@ -111,20 +112,17 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text try { if (text != null) - { - _data = ByteBuffer.allocate(text.length()); - _data.limit(text.length()) ; - //_data.sweep(); - _data.setAutoExpand(true); + { final String encoding = getContentHeaderProperties().getEncodingAsString(); - if (encoding == null) + if (encoding == null || encoding.equalsIgnoreCase("UTF-8")) { - _data.put(text.getBytes(DEFAULT_CHARSET.name())); + _data = ByteBuffer.wrap(Strings.toUTF8(text)); } else { - _data.put(text.getBytes(encoding)); + _data = ByteBuffer.wrap(text.getBytes(encoding)); } + _data.position(_data.limit()); _changedData=true; } _decodedValue = text; diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index e4daaa094e..0d84394c7c 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -109,42 +109,6 @@ public interface Session /** - * <p>This transfer streams a complete message using a single method. - * It uses pull-semantics instead of doing a push.</p> - * <p>Data is pulled from a Message object using read() - * and pushed using messageTransfer() and headers() followed by data() and endData(). - * <br><b><i>This method should only be used by large messages</b></i><br> - * There are two convenience Message classes to do this. - * <ul> - * <li> <code>{@link org.apache.qpid.nclient.util.FileMessage}</code> - * <li> <code>{@link org.apache.qpid.nclient.util.StreamingMessage}</code> - * </ul> - * You can also implement a <code>Message</code> interface to wrap any - * data stream. - * </p> - * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - * <p/> - * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - * </ul> - * @param acquireMode <ul> - * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message - * is acquired when the transfer starts. - * </ul> - * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException; - - /** * This command transfers a message between two peers. * * @param destination Specifies the destination to which the message is to be transferred. @@ -154,46 +118,31 @@ public interface Session * @param acquireMode Indicates whether or not the transferred message has been acquired. */ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Option ... options); - - /** - * Make a set of headers to be sent together with a message - * - * @param headers headers to be added - * @see org.apache.qpid.transport.DeliveryProperties - * @see org.apache.qpid.transport.MessageProperties - * @return The added headers. - */ - public Header header(Struct... headers); - - /** - * Add a byte array to the content of the message being sent. - * - * @param data Data to be added. - */ - public void data(byte[] data); + Header header, ByteBuffer body, Option ... options); /** - * A Add a ByteBuffer to the content of the message being sent. - * <p> Note that only the data between the buffer's current position and the - * buffer limit is added. - * It is therefore recommended to flip the buffer before adding it to the message, + * This command transfers a message between two peers. * - * @param buf Data to be added. + * @param destination Specifies the destination to which the message is to be transferred. + * @param acceptMode Indicates whether message.accept, session.complete, + * or nothing at all is required to indicate successful transfer of the message. + * + * @param acquireMode Indicates whether or not the transferred message has been acquired. */ - public void data(ByteBuffer buf); + public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, + Header header, byte[] body, Option ... options); /** - * Add a string to the content of the message being sent. + * This command transfers a message between two peers. * - * @param str String to be added. - */ - public void data(String str); - - /** - * Signals the end of data for the message. + * @param destination Specifies the destination to which the message is to be transferred. + * @param acceptMode Indicates whether message.accept, session.complete, + * or nothing at all is required to indicate successful transfer of the message. + * + * @param acquireMode Indicates whether or not the transferred message has been acquired. */ - public void endData(); + public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, + Header header, String body, Option ... options); //------------------------------------------------------ // Messaging methods diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index ffde5336f9..089eb1bb17 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -11,8 +11,11 @@ import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; import org.apache.qpid.nclient.ClosedListener; import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; @@ -85,24 +88,29 @@ public class ClientSession extends org.apache.qpid.transport.Session implements public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException { - // The javadoc clearly says that this method is suitable for small messages - // therefore reading the content in one shot. - ByteBuffer data = msg.readData(); - super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode)); - // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() ); - if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() ) + DeliveryProperties dp = msg.getDeliveryProperties(); + MessageProperties mp = msg.getMessageProperties(); + Header header; + if (msg.getHeader() == null || dp.isDirty() || mp.isDirty()) { - msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) ); - msg.getDeliveryProperties().setDirty(false); - msg.getMessageProperties().setDirty(false); + header = new Header(dp, mp); + msg.setHeader(header); + dp.setDirty(false); + mp.setDirty(false); } else { - super.header(msg.getHeader()); + header = msg.getHeader(); } - data( data ); - endData(); + // The javadoc clearly says that this method is suitable for small messages + // therefore reading the content in one shot. + ByteBuffer body = msg.readData(); + int size = body.remaining(); + super.messageTransfer + (destination, MessageAcceptMode.get(acceptMode), + MessageAcquireMode.get(acquireMode), header, body); + _currentDataSizeNotSynced += size; + _currentDataSizeNotFlushed += size; } public void sync() @@ -111,65 +119,6 @@ public class ClientSession extends org.apache.qpid.transport.Session implements _currentDataSizeNotSynced = 0; } - /* ------------------------- - * Data methods - * ------------------------*/ - - public void data(ByteBuffer buf) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining(); - _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining(); - super.data(buf); - } - - public void data(String str) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length; - super.data(str); - } - - public void data(byte[] bytes) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length; - super.data(bytes); - } - - public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode)); - super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); - boolean b = true; - int count = 0; - while(b) - { - try - { - System.out.println("count : " + count++); - data(msg.readData()); - } - catch(EOFException e) - { - b = false; - } - } - endData(); - } - - public void endData() - { - super.endData(); - /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) - { - sync(); - } - if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH) - { - executionFlush(); - _currentDataSizeNotFlushed = 0; - }*/ - } - public RangeSet getRejectedMessages() { return _rejectedMessages; diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java index b57fd0a7ed..adcd49c26d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java @@ -7,7 +7,6 @@ import org.apache.qpid.ErrorCode; import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpid.QpidException; -import org.apache.qpid.transport.Data; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageReject; import org.apache.qpid.transport.MessageTransfer; @@ -18,46 +17,27 @@ import org.apache.qpid.transport.SessionDelegate; public class ClientSessionDelegate extends SessionDelegate -{ - private MessageTransfer _currentTransfer; - private MessagePartListener _currentMessageListener; - - @Override public void sessionDetached(Session ssn, SessionDetached dtc) - { - ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null)); - } - +{ + // -------------------------------------------- // Message methods // -------------------------------------------- - @Override public void data(Session ssn, Data data) + @Override public void messageTransfer(Session session, MessageTransfer xfr) { - _currentMessageListener.data(data.getData()); - if (data.isLast()) + MessagePartListener listener = ((ClientSession)session).getMessageListeners() + .get(xfr.getDestination()); + listener.messageTransfer(xfr.getId()); + listener.messageHeader(xfr.getHeader()); + ByteBuffer body = xfr.getBody(); + if (body == null) { - _currentMessageListener.messageReceived(); + body = ByteBuffer.allocate(0); } + listener.data(body); + listener.messageReceived(); } - @Override public void header(Session ssn, Header header) - { - _currentMessageListener.messageHeader(header); - if( header.hasNoPayload()) - { - _currentMessageListener.data(ByteBuffer.allocate(0)); - _currentMessageListener.messageReceived(); - } - } - - - @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) - { - _currentTransfer = currentTransfer; - _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination()); - _currentMessageListener.messageTransfer(currentTransfer.getId()); - } - - @Override public void messageReject(Session session, MessageReject struct) + @Override public void messageReject(Session session, MessageReject struct) { for (Range range : struct.getTransfers()) { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java index 96e1d2c772..88b5dc6392 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java @@ -9,10 +9,12 @@ import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageProperties; +import java.nio.ByteBuffer; import java.util.UUID; public class DemoClient @@ -56,17 +58,15 @@ public class DemoClient ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); // queue - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.header(new DeliveryProperties().setRoutingKey("queue1"), - new MessageProperties().setMessageId(UUID.randomUUID())); - ssn.data("this is the data"); - ssn.endData(); + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties().setRoutingKey("queue1"), + new MessageProperties().setMessageId(UUID.randomUUID())), + ByteBuffer.wrap("this is the data".getBytes())); //reject - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.data("this should be rejected"); - ssn.header(new DeliveryProperties().setRoutingKey("stocks")); - ssn.endData(); + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties().setRoutingKey("stocks")), + ByteBuffer.wrap("this should be rejected".getBytes())); ssn.sync(); // topic subs @@ -84,11 +84,10 @@ public class DemoClient ssn.sync(); // topic - ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.data("Topic message"); - ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"), - new MessageProperties().setMessageId(UUID.randomUUID())); - ssn.endData(); + ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties().setRoutingKey("stock.us.ibm"), + new MessageProperties().setMessageId(UUID.randomUUID())), + ByteBuffer.wrap("Topic message".getBytes())); } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java deleted file mode 100644 index 36c0a4b3be..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.apache.qpid.nclient.impl; - -import java.io.FileInputStream; - -import org.apache.qpid.ErrorCode; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.FileMessage; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; - -import java.util.UUID; - -public class LargeMsgDemoClient -{ - public static MessagePartListenerAdapter createAdapter() - { - return new MessagePartListenerAdapter(new MessageListener() - { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); - } - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); - ssn.queueDeclare("queue1", null, null); - ssn.exchangeBind("queue1", "amq.direct", "queue1",null); - ssn.sync(); - - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); - - try - { - FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"), - 1024, - new DeliveryProperties().setRoutingKey("queue1"), - new MessageProperties().setMessageId(UUID.randomUUID())); - - // queue - ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); - ssn.sync(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java index 513c1a95de..9ea9297e14 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java @@ -1,5 +1,6 @@ package org.apache.qpid.nclient.interop; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -13,6 +14,7 @@ import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; @@ -77,18 +79,15 @@ public class BasicInteropTest implements ClosedListener public void testSendMessage(){ System.out.println("------- Sending a message --------"); - session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - Map<String,Object> props = new HashMap<String,Object>(); props.put("name", "rajith"); props.put("age", 10); props.put("spf", 8.5); - session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props)); - - //session.header(new DeliveryProperties().setRoutingKey("testKey")); + session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties().setRoutingKey("testKey"), + new MessageProperties().setApplicationHeaders(props)), + ByteBuffer.wrap("TestMessage".getBytes())); - session.data("TestMessage"); - session.endData(); session.sync(); System.out.println("------- Message sent --------"); } |
