From 2c5100e6829529ea0df4463c5d914d613e45c1c8 Mon Sep 17 00:00:00 2001
From: "Rafael H. Schloming" This transfer streams a complete message using a single method.
- * It uses pull-semantics instead of doing a push. Data is pulled from a Message object using read()
- * and pushed using messageTransfer() and headers() followed by data() and endData().
- *
This method should only be used by large messages
- * There are two convenience Message classes to do this.
- *
- *
- * You can also implement a {@link org.apache.qpid.nclient.util.FileMessage}
- * {@link org.apache.qpid.nclient.util.StreamingMessage}
- * Message interface to wrap any
- * data stream.
- *
Note that only the data between the buffer's current position and the
- * buffer limit is added.
- * It is therefore recommended to flip the buffer before adding it to the message,
+ * This command transfers a message between two peers.
*
- * @param buf Data to be added.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void data(ByteBuffer buf);
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, byte[] body, Option ... options);
/**
- * Add a string to the content of the message being sent.
+ * This command transfers a message between two peers.
*
- * @param str String to be added.
- */
- public void data(String str);
-
- /**
- * Signals the end of data for the message.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void endData();
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, String body, Option ... options);
//------------------------------------------------------
// Messaging methods
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index ffde5336f9..089eb1bb17 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -11,8 +11,11 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.api.Message;
import org.apache.qpid.nclient.ClosedListener;
import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
@@ -85,24 +88,29 @@ public class ClientSession extends org.apache.qpid.transport.Session implements
public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
{
- // The javadoc clearly says that this method is suitable for small messages
- // therefore reading the content in one shot.
- ByteBuffer data = msg.readData();
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() );
- if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
+ DeliveryProperties dp = msg.getDeliveryProperties();
+ MessageProperties mp = msg.getMessageProperties();
+ Header header;
+ if (msg.getHeader() == null || dp.isDirty() || mp.isDirty())
{
- msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
- msg.getDeliveryProperties().setDirty(false);
- msg.getMessageProperties().setDirty(false);
+ header = new Header(dp, mp);
+ msg.setHeader(header);
+ dp.setDirty(false);
+ mp.setDirty(false);
}
else
{
- super.header(msg.getHeader());
+ header = msg.getHeader();
}
- data( data );
- endData();
+ // The javadoc clearly says that this method is suitable for small messages
+ // therefore reading the content in one shot.
+ ByteBuffer body = msg.readData();
+ int size = body.remaining();
+ super.messageTransfer
+ (destination, MessageAcceptMode.get(acceptMode),
+ MessageAcquireMode.get(acquireMode), header, body);
+ _currentDataSizeNotSynced += size;
+ _currentDataSizeNotFlushed += size;
}
public void sync()
@@ -111,65 +119,6 @@ public class ClientSession extends org.apache.qpid.transport.Session implements
_currentDataSizeNotSynced = 0;
}
- /* -------------------------
- * Data methods
- * ------------------------*/
-
- public void data(ByteBuffer buf)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
- _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
- super.data(buf);
- }
-
- public void data(String str)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
- super.data(str);
- }
-
- public void data(byte[] bytes)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
- super.data(bytes);
- }
-
- public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
- {
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- boolean b = true;
- int count = 0;
- while(b)
- {
- try
- {
- System.out.println("count : " + count++);
- data(msg.readData());
- }
- catch(EOFException e)
- {
- b = false;
- }
- }
- endData();
- }
-
- public void endData()
- {
- super.endData();
- /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
- {
- sync();
- }
- if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
- {
- executionFlush();
- _currentDataSizeNotFlushed = 0;
- }*/
- }
-
public RangeSet getRejectedMessages()
{
return _rejectedMessages;
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
index b57fd0a7ed..adcd49c26d 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
@@ -7,7 +7,6 @@ import org.apache.qpid.ErrorCode;
import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageReject;
import org.apache.qpid.transport.MessageTransfer;
@@ -18,46 +17,27 @@ import org.apache.qpid.transport.SessionDelegate;
public class ClientSessionDelegate extends SessionDelegate
-{
- private MessageTransfer _currentTransfer;
- private MessagePartListener _currentMessageListener;
-
- @Override public void sessionDetached(Session ssn, SessionDetached dtc)
- {
- ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null));
- }
-
+{
+
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Data data)
+ @Override public void messageTransfer(Session session, MessageTransfer xfr)
{
- _currentMessageListener.data(data.getData());
- if (data.isLast())
+ MessagePartListener listener = ((ClientSession)session).getMessageListeners()
+ .get(xfr.getDestination());
+ listener.messageTransfer(xfr.getId());
+ listener.messageHeader(xfr.getHeader());
+ ByteBuffer body = xfr.getBody();
+ if (body == null)
{
- _currentMessageListener.messageReceived();
+ body = ByteBuffer.allocate(0);
}
+ listener.data(body);
+ listener.messageReceived();
}
- @Override public void header(Session ssn, Header header)
- {
- _currentMessageListener.messageHeader(header);
- if( header.hasNoPayload())
- {
- _currentMessageListener.data(ByteBuffer.allocate(0));
- _currentMessageListener.messageReceived();
- }
- }
-
-
- @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
- {
- _currentTransfer = currentTransfer;
- _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination());
- _currentMessageListener.messageTransfer(currentTransfer.getId());
- }
-
- @Override public void messageReject(Session session, MessageReject struct)
+ @Override public void messageReject(Session session, MessageReject struct)
{
for (Range range : struct.getTransfers())
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
index 96e1d2c772..88b5dc6392 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
@@ -9,10 +9,12 @@ import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageProperties;
+import java.nio.ByteBuffer;
import java.util.UUID;
public class DemoClient
@@ -56,17 +58,15 @@ public class DemoClient
ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
// queue
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.data("this is the data");
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("this is the data".getBytes()));
//reject
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stocks")),
+ ByteBuffer.wrap("this should be rejected".getBytes()));
ssn.sync();
// topic subs
@@ -84,11 +84,10 @@ public class DemoClient
ssn.sync();
// topic
- ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("Topic message");
- ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.endData();
+ ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("Topic message".getBytes()));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
deleted file mode 100644
index 36c0a4b3be..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.qpid.nclient.impl;
-
-import java.io.FileInputStream;
-
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.FileMessage;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-
-import java.util.UUID;
-
-public class LargeMsgDemoClient
-{
- public static MessagePartListenerAdapter createAdapter()
- {
- return new MessagePartListenerAdapter(new MessageListener()
- {
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
- }
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
- ssn.queueDeclare("queue1", null, null);
- ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
- ssn.sync();
-
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
-
- try
- {
- FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
- 1024,
- new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
-
- // queue
- ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
- ssn.sync();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
index 513c1a95de..9ea9297e14 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
@@ -1,5 +1,6 @@
package org.apache.qpid.nclient.interop;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -13,6 +14,7 @@ import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
@@ -77,18 +79,15 @@ public class BasicInteropTest implements ClosedListener
public void testSendMessage(){
System.out.println("------- Sending a message --------");
- session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-
Map