summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-05 19:33:11 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-05 19:33:11 +0000
commit2c5100e6829529ea0df4463c5d914d613e45c1c8 (patch)
treec27e316d892edf5ac42348825a0ba2079f7f80a7 /java/client/src
parentb5f8cf1bd9b5652e2691d6bc5b9b1c3228f53d68 (diff)
downloadqpid-python-2c5100e6829529ea0df4463c5d914d613e45c1c8.tar.gz
Profiling driven changes:
- made AMQShortString cache the toString() value - added static initializer to IoTransport to disable use of pooled byte buffers - modified IoSender to permit buffering - removed OutputHandler and eliminated intermediate Frame generation between Disassembler and Sender<ByteBuffer> (IoSender) - made Disassembler take advantage of IoSender's buffering - removed Header and Data as distinct protocol events, added Header and Body members to MessageTransfer - modified Assembler and Disassembler to decode/encode Header and Data directly to/from MessageTransfer - modified Disassembler to only write data if encoding of headers is successful - added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding that is also fast for 7-bit ascii - modified JMSTextMessage to use the Strings.toUTF8 - modified QpidBench to only generate 7-bit ascii when using TextMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Session.java85
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java93
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java76
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java13
7 files changed, 76 insertions, 278 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 4a43a7bba8..b29e39a52e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.util.Strings;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
@@ -111,20 +112,17 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
try
{
if (text != null)
- {
- _data = ByteBuffer.allocate(text.length());
- _data.limit(text.length()) ;
- //_data.sweep();
- _data.setAutoExpand(true);
+ {
final String encoding = getContentHeaderProperties().getEncodingAsString();
- if (encoding == null)
+ if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
{
- _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+ _data = ByteBuffer.wrap(Strings.toUTF8(text));
}
else
{
- _data.put(text.getBytes(encoding));
+ _data = ByteBuffer.wrap(text.getBytes(encoding));
}
+ _data.position(_data.limit());
_changedData=true;
}
_decodedValue = text;
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
index e4daaa094e..0d84394c7c 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
@@ -109,42 +109,6 @@ public interface Session
/**
- * <p>This transfer streams a complete message using a single method.
- * It uses pull-semantics instead of doing a push.</p>
- * <p>Data is pulled from a Message object using read()
- * and pushed using messageTransfer() and headers() followed by data() and endData().
- * <br><b><i>This method should only be used by large messages</b></i><br>
- * There are two convenience Message classes to do this.
- * <ul>
- * <li> <code>{@link org.apache.qpid.nclient.util.FileMessage}</code>
- * <li> <code>{@link org.apache.qpid.nclient.util.StreamingMessage}</code>
- * </ul>
- * You can also implement a <code>Message</code> interface to wrap any
- * data stream.
- * </p>
- *
- * @param destination The exchange the message is being sent to.
- * @param msg The Message to be sent.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
- * must be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
- * is acquired when the transfer starts.
- * </ul>
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
- */
- public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException;
-
- /**
* This command transfers a message between two peers.
*
* @param destination Specifies the destination to which the message is to be transferred.
@@ -154,46 +118,31 @@ public interface Session
* @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Option ... options);
-
- /**
- * Make a set of headers to be sent together with a message
- *
- * @param headers headers to be added
- * @see org.apache.qpid.transport.DeliveryProperties
- * @see org.apache.qpid.transport.MessageProperties
- * @return The added headers.
- */
- public Header header(Struct... headers);
-
- /**
- * Add a byte array to the content of the message being sent.
- *
- * @param data Data to be added.
- */
- public void data(byte[] data);
+ Header header, ByteBuffer body, Option ... options);
/**
- * A Add a ByteBuffer to the content of the message being sent.
- * <p> Note that only the data between the buffer's current position and the
- * buffer limit is added.
- * It is therefore recommended to flip the buffer before adding it to the message,
+ * This command transfers a message between two peers.
*
- * @param buf Data to be added.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void data(ByteBuffer buf);
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, byte[] body, Option ... options);
/**
- * Add a string to the content of the message being sent.
+ * This command transfers a message between two peers.
*
- * @param str String to be added.
- */
- public void data(String str);
-
- /**
- * Signals the end of data for the message.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void endData();
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, String body, Option ... options);
//------------------------------------------------------
// Messaging methods
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index ffde5336f9..089eb1bb17 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -11,8 +11,11 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.api.Message;
import org.apache.qpid.nclient.ClosedListener;
import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
@@ -85,24 +88,29 @@ public class ClientSession extends org.apache.qpid.transport.Session implements
public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
{
- // The javadoc clearly says that this method is suitable for small messages
- // therefore reading the content in one shot.
- ByteBuffer data = msg.readData();
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() );
- if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
+ DeliveryProperties dp = msg.getDeliveryProperties();
+ MessageProperties mp = msg.getMessageProperties();
+ Header header;
+ if (msg.getHeader() == null || dp.isDirty() || mp.isDirty())
{
- msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
- msg.getDeliveryProperties().setDirty(false);
- msg.getMessageProperties().setDirty(false);
+ header = new Header(dp, mp);
+ msg.setHeader(header);
+ dp.setDirty(false);
+ mp.setDirty(false);
}
else
{
- super.header(msg.getHeader());
+ header = msg.getHeader();
}
- data( data );
- endData();
+ // The javadoc clearly says that this method is suitable for small messages
+ // therefore reading the content in one shot.
+ ByteBuffer body = msg.readData();
+ int size = body.remaining();
+ super.messageTransfer
+ (destination, MessageAcceptMode.get(acceptMode),
+ MessageAcquireMode.get(acquireMode), header, body);
+ _currentDataSizeNotSynced += size;
+ _currentDataSizeNotFlushed += size;
}
public void sync()
@@ -111,65 +119,6 @@ public class ClientSession extends org.apache.qpid.transport.Session implements
_currentDataSizeNotSynced = 0;
}
- /* -------------------------
- * Data methods
- * ------------------------*/
-
- public void data(ByteBuffer buf)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
- _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
- super.data(buf);
- }
-
- public void data(String str)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
- super.data(str);
- }
-
- public void data(byte[] bytes)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
- super.data(bytes);
- }
-
- public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
- {
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- boolean b = true;
- int count = 0;
- while(b)
- {
- try
- {
- System.out.println("count : " + count++);
- data(msg.readData());
- }
- catch(EOFException e)
- {
- b = false;
- }
- }
- endData();
- }
-
- public void endData()
- {
- super.endData();
- /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
- {
- sync();
- }
- if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
- {
- executionFlush();
- _currentDataSizeNotFlushed = 0;
- }*/
- }
-
public RangeSet getRejectedMessages()
{
return _rejectedMessages;
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
index b57fd0a7ed..adcd49c26d 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
@@ -7,7 +7,6 @@ import org.apache.qpid.ErrorCode;
import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageReject;
import org.apache.qpid.transport.MessageTransfer;
@@ -18,46 +17,27 @@ import org.apache.qpid.transport.SessionDelegate;
public class ClientSessionDelegate extends SessionDelegate
-{
- private MessageTransfer _currentTransfer;
- private MessagePartListener _currentMessageListener;
-
- @Override public void sessionDetached(Session ssn, SessionDetached dtc)
- {
- ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null));
- }
-
+{
+
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Data data)
+ @Override public void messageTransfer(Session session, MessageTransfer xfr)
{
- _currentMessageListener.data(data.getData());
- if (data.isLast())
+ MessagePartListener listener = ((ClientSession)session).getMessageListeners()
+ .get(xfr.getDestination());
+ listener.messageTransfer(xfr.getId());
+ listener.messageHeader(xfr.getHeader());
+ ByteBuffer body = xfr.getBody();
+ if (body == null)
{
- _currentMessageListener.messageReceived();
+ body = ByteBuffer.allocate(0);
}
+ listener.data(body);
+ listener.messageReceived();
}
- @Override public void header(Session ssn, Header header)
- {
- _currentMessageListener.messageHeader(header);
- if( header.hasNoPayload())
- {
- _currentMessageListener.data(ByteBuffer.allocate(0));
- _currentMessageListener.messageReceived();
- }
- }
-
-
- @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
- {
- _currentTransfer = currentTransfer;
- _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination());
- _currentMessageListener.messageTransfer(currentTransfer.getId());
- }
-
- @Override public void messageReject(Session session, MessageReject struct)
+ @Override public void messageReject(Session session, MessageReject struct)
{
for (Range range : struct.getTransfers())
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
index 96e1d2c772..88b5dc6392 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
@@ -9,10 +9,12 @@ import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageProperties;
+import java.nio.ByteBuffer;
import java.util.UUID;
public class DemoClient
@@ -56,17 +58,15 @@ public class DemoClient
ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
// queue
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.data("this is the data");
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("this is the data".getBytes()));
//reject
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stocks")),
+ ByteBuffer.wrap("this should be rejected".getBytes()));
ssn.sync();
// topic subs
@@ -84,11 +84,10 @@ public class DemoClient
ssn.sync();
// topic
- ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("Topic message");
- ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.endData();
+ ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("Topic message".getBytes()));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
deleted file mode 100644
index 36c0a4b3be..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.qpid.nclient.impl;
-
-import java.io.FileInputStream;
-
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.FileMessage;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-
-import java.util.UUID;
-
-public class LargeMsgDemoClient
-{
- public static MessagePartListenerAdapter createAdapter()
- {
- return new MessagePartListenerAdapter(new MessageListener()
- {
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
- }
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
- ssn.queueDeclare("queue1", null, null);
- ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
- ssn.sync();
-
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
-
- try
- {
- FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
- 1024,
- new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
-
- // queue
- ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
- ssn.sync();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
index 513c1a95de..9ea9297e14 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
@@ -1,5 +1,6 @@
package org.apache.qpid.nclient.interop;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -13,6 +14,7 @@ import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
@@ -77,18 +79,15 @@ public class BasicInteropTest implements ClosedListener
public void testSendMessage(){
System.out.println("------- Sending a message --------");
- session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-
Map<String,Object> props = new HashMap<String,Object>();
props.put("name", "rajith");
props.put("age", 10);
props.put("spf", 8.5);
- session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props));
-
- //session.header(new DeliveryProperties().setRoutingKey("testKey"));
+ session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("testKey"),
+ new MessageProperties().setApplicationHeaders(props)),
+ ByteBuffer.wrap("TestMessage".getBytes()));
- session.data("TestMessage");
- session.endData();
session.sync();
System.out.println("------- Message sent --------");
}