summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-09-13 21:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2007-09-13 21:42:57 +0000
commite10d11937bccc3cdbdd867266501c3e16d28e933 (patch)
treeee31690915cbb880ba553708ed11b9b607b23a0b /java/client
parent0a1b3430450f274aee273a9f792a2d43f771b85f (diff)
downloadqpid-python-e10d11937bccc3cdbdd867266501c3e16d28e933.tar.gz
* moved most of the classes in the org.apache.qpidity package to
org.apache.qpidity.transport * factored out the network specific pieces into org.apache.qpidity.transport * moved the mina specific code to org.apache.qpidity.transport.network.mina * replaced the handler chain with Sender/Receiver chains that can deal with close request/closed notifications * moved from an anonymous struct[] to a real Header class * removed an excess copy from message data transmit git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575474 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java53
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/DtxSession.java18
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java33
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java50
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java2
26 files changed, 125 insertions, 125 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index aa6756d116..aaa724fd93 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,8 +27,8 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ec27fdbb71..3bd5856df5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -28,7 +28,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
import javax.jms.JMSException;
import java.io.IOException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 8ecb5ffd78..f3fa79eb51 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -26,7 +26,7 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
import org.apache.qpidity.client.util.ByteBufferMessage;
-import org.apache.qpidity.ReplyTo;
+import org.apache.qpidity.transport.ReplyTo;
import javax.jms.Message;
import javax.jms.JMSException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index b115086d71..f5603b1695 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -27,9 +27,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 5c1ee713fc..6198f0504e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -27,7 +27,7 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
public interface MessageFactory
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index b60fc26fc0..13a2202e6f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -30,9 +30,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 8d78f9f7fd..970ba5a66a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -25,8 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index fc89f2d368..3bc684a6ca 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -6,16 +6,18 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpidity.BrokerDetails;
-import org.apache.qpidity.Channel;
-import org.apache.qpidity.Connection;
-import org.apache.qpidity.ConnectionClose;
-import org.apache.qpidity.ConnectionDelegate;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.client.impl.ClientSession;
import org.apache.qpidity.client.impl.ClientSessionDelegate;
+import org.apache.qpidity.transport.Channel;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.url.QpidURL;
@@ -25,47 +27,48 @@ public class Client implements org.apache.qpidity.client.Connection
private Connection _conn;
private ExceptionListener _exceptionListner;
private final Lock _lock = new ReentrantLock();
-
+
/**
- *
+ *
* @return returns a new connection to the broker.
*/
public static org.apache.qpidity.client.Connection createConnection()
{
return new Client();
}
-
+
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
Condition negotiationComplete = _lock.newCondition();
_lock.lock();
-
+
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
- {
+ {
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
}
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+
+ @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
_exceptionListner.onException(
- new QpidException("Server closed the connection: Reason " +
+ new QpidException("Server closed the connection: Reason " +
connectionClose.getReplyText(),
ErrorCode.get(connectionClose.getReplyCode()),
null));
}
};
-
+
connectionDelegate.setCondition(_lock,negotiationComplete);
connectionDelegate.setUsername(username);
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
-
+
_conn = MinaHandler.connect(host, port,connectionDelegate);
-
- _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
-
+
+ // XXX: hardcoded version numbers
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+
try
{
negotiationComplete.await();
@@ -79,7 +82,7 @@ public class Client implements org.apache.qpidity.client.Connection
_lock.unlock();
}
}
-
+
/*
* Until the dust settles with the URL disucssion
* I am not going to implement this.
@@ -94,9 +97,9 @@ public class Client implements org.apache.qpidity.client.Connection
details.getUserName(),
details.getPassword());
}
-
+
public void close() throws QpidException
- {
+ {
Channel ch = _conn.getChannel(0);
ch.connectionClose(0, "client is closing", 0, 0);
//need to close the connection underneath as well
@@ -104,7 +107,7 @@ public class Client implements org.apache.qpidity.client.Connection
public Session createSession(long expiryInSeconds)
{
- Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
@@ -116,10 +119,10 @@ public class Client implements org.apache.qpidity.client.Connection
// TODO Auto-generated method stub
return null;
}
-
+
public void setExceptionListener(ExceptionListener exceptionListner)
{
- _exceptionListner = exceptionListner;
+ _exceptionListner = exceptionListner;
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
index 21532f7d46..bf6433af6a 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
@@ -18,15 +18,15 @@
*/
package org.apache.qpidity.client;
-import org.apache.qpidity.DtxCoordinationCommitResult;
-import org.apache.qpidity.DtxCoordinationGetTimeoutResult;
-import org.apache.qpidity.DtxCoordinationPrepareResult;
-import org.apache.qpidity.DtxCoordinationRecoverResult;
-import org.apache.qpidity.DtxCoordinationRollbackResult;
-import org.apache.qpidity.DtxDemarcationEndResult;
-import org.apache.qpidity.DtxDemarcationStartResult;
-import org.apache.qpidity.Future;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.DtxCoordinationCommitResult;
+import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
+import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
+import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
+import org.apache.qpidity.transport.DtxDemarcationEndResult;
+import org.apache.qpidity.transport.DtxDemarcationStartResult;
+import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.Option;
/**
* This session�s resources are control under the scope of a distributed transaction.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index 4ccef6df55..f0c2c8ca9c 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -19,7 +19,7 @@ package org.apache.qpidity.client;
import java.nio.ByteBuffer;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Header;
/**
* Assembles message parts.
@@ -47,7 +47,7 @@ public interface MessagePartListener
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void messageHeaders(Struct... headers);
+ public void messageHeader(Header header);
/**
* Add the following byte array to the content of the message being received
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index d8be937e46..2340d27882 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -22,9 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.api.Message;
/**
@@ -186,7 +186,7 @@ public interface Session
* @see org.apache.qpidity.DeliveryProperties
* @see org.apache.qpidity.MessageProperties
*/
- public void headers(Struct... headers);
+ public void header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index ea225976f2..2e53bdfcad 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -5,10 +5,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
@@ -16,7 +16,7 @@ import org.apache.qpidity.client.MessagePartListener;
/**
* Implements a Qpid Sesion.
*/
-public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session
+public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.client.Session
{
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ExceptionListener _exceptionListner;
@@ -46,7 +46,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
super.data(msg.readData());
super.endData();
}
@@ -54,7 +54,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
boolean b = true;
int count = 0;
while(b)
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index c4565d4544..1e003b23e6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -3,18 +3,21 @@ package org.apache.qpidity.client.impl;
import java.nio.ByteBuffer;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.Frame;
-import org.apache.qpidity.MessageAcquired;
-import org.apache.qpidity.MessageReject;
-import org.apache.qpidity.MessageTransfer;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.Session;
-import org.apache.qpidity.SessionClosed;
-import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.Struct;
+
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.MessageAcquired;
+import org.apache.qpidity.transport.MessageReject;
+import org.apache.qpidity.transport.MessageTransfer;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.Session;
+import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.Struct;
+
public class ClientSessionDelegate extends SessionDelegate
{
@@ -29,22 +32,22 @@ public class ClientSessionDelegate extends SessionDelegate
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- for (ByteBuffer b : frame)
+ for (ByteBuffer b : data.getFragments())
{
_currentMessageListener.data(b);
}
- if (frame.isLastSegment() && frame.isLastFrame())
+ if (data.isLast())
{
_currentMessageListener.messageReceived();
}
}
- @Override public void headers(Session ssn, Struct... headers)
+ @Override public void header(Session ssn, Header header)
{
- _currentMessageListener.messageHeaders(headers);
+ _currentMessageListener.messageHeader(header);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
index 1f8e9610d1..c15ac8f6e5 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
@@ -1,7 +1,5 @@
package org.apache.qpidity.client.impl;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -10,6 +8,8 @@ import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class DemoClient
{
@@ -53,14 +53,14 @@ public class DemoClient
// queue
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
ssn.data("this is the data");
ssn.endData();
//reject
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
ssn.data("this should be rejected");
- ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
ssn.endData();
ssn.sync();
@@ -81,7 +81,7 @@ public class DemoClient
// topic
ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
ssn.data("Topic message");
- ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
ssn.endData();
ssn.sync();
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
index acf5c283b2..4e90a82fff 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
@@ -2,8 +2,6 @@ package org.apache.qpidity.client.impl;
import java.io.FileInputStream;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -13,6 +11,8 @@ import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.FileMessage;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class LargeMsgDemoClient
{
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index 218c6cd018..bf0889a913 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -5,8 +5,8 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
index 73e71ed84d..34fc057724 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -7,8 +7,8 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index e4f19ea6c3..c40a85863d 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -3,16 +3,16 @@ package org.apache.qpidity.client.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.client.MessagePartListener;
-/**
+/**
* This is a simple message assembler.
- * Will call onMessage method of the adaptee
+ * Will call onMessage method of the adaptee
* when all message data is read.
- *
+ *
* This is a good convinience utility for handling
* small messages
*/
@@ -20,17 +20,17 @@ public class MessagePartListenerAdapter implements MessagePartListener
{
MessageListener _adaptee;
ByteBufferMessage _currentMsg;
-
+
public MessagePartListenerAdapter(MessageListener listener)
{
- _adaptee = listener;
+ _adaptee = listener;
}
-
+
public void messageTransfer(long transferId)
{
_currentMsg = new ByteBufferMessage(transferId);
}
-
+
public void data(ByteBuffer src)
{
try
@@ -40,28 +40,20 @@ public class MessagePartListenerAdapter implements MessagePartListener
catch(IOException e)
{
// A chance for IO exception
- // doesn't occur as we are using
+ // doesn't occur as we are using
// a ByteBuffer
}
}
- public void messageHeaders(Struct... headers)
- {
- for(Struct struct: headers)
- {
- if(struct instanceof DeliveryProperties)
- {
- _currentMsg.setDeliveryProperties((DeliveryProperties)struct);
- }
- else if (struct instanceof MessageProperties)
- {
- _currentMsg.setMessageProperties((MessageProperties)struct);
- }
- }
- }
-
- public void messageReceived()
- {
+ public void messageHeader(Header header)
+ {
+ _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
+ _currentMsg.setMessageProperties(header.get(MessageProperties.class));
+ }
+
+ public void messageReceived()
+ {
_adaptee.onMessage(_currentMsg);
- }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
index 2527856798..dfb65214d2 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
@@ -2,8 +2,8 @@ package org.apache.qpidity.client.util;
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public abstract class ReadOnlyMessage implements Message
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
index 23089c7931..37e726d6c6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
@@ -5,8 +5,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public class StreamingMessage extends ReadOnlyMessage implements Message
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index a3c03ca6d0..8b6e694d25 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -26,9 +26,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
-import org.apache.qpidity.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
@@ -36,6 +34,8 @@ import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
/**
* Implementation of JMS message consumer
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
index 9ed74e1cd0..ec837bc4e6 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
@@ -18,7 +18,7 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import org.apache.qpidity.exchange.ExchangeDefaults;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 675a51625a..40bf7f15a8 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
index 22feb29598..d23b08e37a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
@@ -18,8 +18,8 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import javax.jms.Topic;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
index b84b55966e..dab7585deb 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
@@ -21,8 +21,10 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpidity.*;
+import org.apache.qpidity.QpidException;
import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index c11a7d8c3b..a7e4d02651 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -29,8 +29,8 @@ import java.io.IOException;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ReplyTo;
import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.transport.ReplyTo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;