diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-09-13 21:42:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-09-13 21:42:57 +0000 |
| commit | e10d11937bccc3cdbdd867266501c3e16d28e933 (patch) | |
| tree | ee31690915cbb880ba553708ed11b9b607b23a0b /java/client | |
| parent | 0a1b3430450f274aee273a9f792a2d43f771b85f (diff) | |
| download | qpid-python-e10d11937bccc3cdbdd867266501c3e16d28e933.tar.gz | |
* moved most of the classes in the org.apache.qpidity package to
org.apache.qpidity.transport
* factored out the network specific pieces into
org.apache.qpidity.transport
* moved the mina specific code to
org.apache.qpidity.transport.network.mina
* replaced the handler chain with Sender/Receiver chains that can
deal with close request/closed notifications
* moved from an anonymous struct[] to a real Header class
* removed an excess copy from message data transmit
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575474 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
26 files changed, 125 insertions, 125 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index aa6756d116..aaa724fd93 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,8 +27,8 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ec27fdbb71..3bd5856df5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.qpidity.api.Message; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Struct; import javax.jms.JMSException; import java.io.IOException; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 8ecb5ffd78..f3fa79eb51 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -26,7 +26,7 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpidity.jms.ExceptionHelper; import org.apache.qpidity.client.util.ByteBufferMessage; -import org.apache.qpidity.ReplyTo; +import org.apache.qpidity.transport.ReplyTo; import javax.jms.Message; import javax.jms.JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index b115086d71..f5603b1695 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -27,9 +27,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpidity.Struct; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 5c1ee713fc..6198f0504e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -27,7 +27,7 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Struct; public interface MessageFactory diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index b60fc26fc0..13a2202e6f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -30,9 +30,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpidity.Struct; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 8d78f9f7fd..970ba5a66a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -25,8 +25,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.Struct; /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index fc89f2d368..3bc684a6ca 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -6,16 +6,18 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.qpidity.BrokerDetails; -import org.apache.qpidity.Channel; -import org.apache.qpidity.Connection; -import org.apache.qpidity.ConnectionClose; -import org.apache.qpidity.ConnectionDelegate; import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; -import org.apache.qpidity.SessionDelegate; import org.apache.qpidity.client.impl.ClientSession; import org.apache.qpidity.client.impl.ClientSessionDelegate; +import org.apache.qpidity.transport.Channel; +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionClose; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.SessionDelegate; +import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.url.QpidURL; @@ -25,47 +27,48 @@ public class Client implements org.apache.qpidity.client.Connection private Connection _conn; private ExceptionListener _exceptionListner; private final Lock _lock = new ReentrantLock(); - + /** - * + * * @return returns a new connection to the broker. */ public static org.apache.qpidity.client.Connection createConnection() { return new Client(); } - + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { Condition negotiationComplete = _lock.newCondition(); _lock.lock(); - + ConnectionDelegate connectionDelegate = new ConnectionDelegate() - { + { public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) + + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { _exceptionListner.onException( - new QpidException("Server closed the connection: Reason " + + new QpidException("Server closed the connection: Reason " + connectionClose.getReplyText(), ErrorCode.get(connectionClose.getReplyCode()), null)); } }; - + connectionDelegate.setCondition(_lock,negotiationComplete); connectionDelegate.setUsername(username); connectionDelegate.setPassword(password); connectionDelegate.setVirtualHost(virtualHost); - + _conn = MinaHandler.connect(host, port,connectionDelegate); - - _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); - + + // XXX: hardcoded version numbers + _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + try { negotiationComplete.await(); @@ -79,7 +82,7 @@ public class Client implements org.apache.qpidity.client.Connection _lock.unlock(); } } - + /* * Until the dust settles with the URL disucssion * I am not going to implement this. @@ -94,9 +97,9 @@ public class Client implements org.apache.qpidity.client.Connection details.getUserName(), details.getPassword()); } - + public void close() throws QpidException - { + { Channel ch = _conn.getChannel(0); ch.connectionClose(0, "client is closing", 0, 0); //need to close the connection underneath as well @@ -104,7 +107,7 @@ public class Client implements org.apache.qpidity.client.Connection public Session createSession(long expiryInSeconds) { - Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); ClientSession ssn = new ClientSession(); ssn.attach(ch); ssn.sessionOpen(expiryInSeconds); @@ -116,10 +119,10 @@ public class Client implements org.apache.qpidity.client.Connection // TODO Auto-generated method stub return null; } - + public void setExceptionListener(ExceptionListener exceptionListner) { - _exceptionListner = exceptionListner; + _exceptionListner = exceptionListner; } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java index 21532f7d46..bf6433af6a 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java @@ -18,15 +18,15 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.DtxCoordinationCommitResult; -import org.apache.qpidity.DtxCoordinationGetTimeoutResult; -import org.apache.qpidity.DtxCoordinationPrepareResult; -import org.apache.qpidity.DtxCoordinationRecoverResult; -import org.apache.qpidity.DtxCoordinationRollbackResult; -import org.apache.qpidity.DtxDemarcationEndResult; -import org.apache.qpidity.DtxDemarcationStartResult; -import org.apache.qpidity.Future; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.DtxCoordinationCommitResult; +import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult; +import org.apache.qpidity.transport.DtxCoordinationPrepareResult; +import org.apache.qpidity.transport.DtxCoordinationRecoverResult; +import org.apache.qpidity.transport.DtxCoordinationRollbackResult; +import org.apache.qpidity.transport.DtxDemarcationEndResult; +import org.apache.qpidity.transport.DtxDemarcationStartResult; +import org.apache.qpidity.transport.Future; +import org.apache.qpidity.transport.Option; /** * This session�s resources are control under the scope of a distributed transaction. diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 4ccef6df55..f0c2c8ca9c 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -19,7 +19,7 @@ package org.apache.qpidity.client; import java.nio.ByteBuffer; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Header; /** * Assembles message parts. @@ -47,7 +47,7 @@ public interface MessagePartListener * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> */ - public void messageHeaders(Struct... headers); + public void messageHeader(Header header); /** * Add the following byte array to the content of the message being received diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index d8be937e46..2340d27882 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.api.Message;
/**
@@ -186,7 +186,7 @@ public interface Session * @see org.apache.qpidity.DeliveryProperties
* @see org.apache.qpidity.MessageProperties
*/
- public void headers(Struct... headers);
+ public void header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index ea225976f2..2e53bdfcad 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -5,10 +5,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; @@ -16,7 +16,7 @@ import org.apache.qpidity.client.MessagePartListener; /** * Implements a Qpid Sesion. */ -public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.client.Session { private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ExceptionListener _exceptionListner; @@ -46,7 +46,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa // The javadoc clearly says that this method is suitable for small messages // therefore reading the content in one shot. super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); super.data(msg.readData()); super.endData(); } @@ -54,7 +54,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException { super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); boolean b = true; int count = 0; while(b) diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index c4565d4544..1e003b23e6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -3,18 +3,21 @@ package org.apache.qpidity.client.impl; import java.nio.ByteBuffer; import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.Frame; -import org.apache.qpidity.MessageAcquired; -import org.apache.qpidity.MessageReject; -import org.apache.qpidity.MessageTransfer; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.Session; -import org.apache.qpidity.SessionClosed; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.Struct; + import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.MessageAcquired; +import org.apache.qpidity.transport.MessageReject; +import org.apache.qpidity.transport.MessageTransfer; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.Session; +import org.apache.qpidity.transport.SessionClosed; +import org.apache.qpidity.transport.SessionDelegate; +import org.apache.qpidity.transport.Struct; + public class ClientSessionDelegate extends SessionDelegate { @@ -29,22 +32,22 @@ public class ClientSessionDelegate extends SessionDelegate // -------------------------------------------- // Message methods // -------------------------------------------- - @Override public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - for (ByteBuffer b : frame) + for (ByteBuffer b : data.getFragments()) { _currentMessageListener.data(b); } - if (frame.isLastSegment() && frame.isLastFrame()) + if (data.isLast()) { _currentMessageListener.messageReceived(); } } - @Override public void headers(Session ssn, Struct... headers) + @Override public void header(Session ssn, Header header) { - _currentMessageListener.messageHeaders(headers); + _currentMessageListener.messageHeader(header); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java index 1f8e9610d1..c15ac8f6e5 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java @@ -1,7 +1,5 @@ package org.apache.qpidity.client.impl; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.Client; @@ -10,6 +8,8 @@ import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.MessageListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; public class DemoClient { @@ -53,14 +53,14 @@ public class DemoClient // queue ssn.messageTransfer("amq.direct", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); ssn.data("this is the data"); ssn.endData(); //reject ssn.messageTransfer("amq.direct", (short) 0, (short) 1); ssn.data("this should be rejected"); - ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.header(new DeliveryProperties().setRoutingKey("stocks")); ssn.endData(); ssn.sync(); @@ -81,7 +81,7 @@ public class DemoClient // topic ssn.messageTransfer("amq.topic", (short) 0, (short) 1); ssn.data("Topic message"); - ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); ssn.endData(); ssn.sync(); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java index acf5c283b2..4e90a82fff 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java @@ -2,8 +2,6 @@ package org.apache.qpidity.client.impl; import java.io.FileInputStream; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.Client; @@ -13,6 +11,8 @@ import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.FileMessage; import org.apache.qpidity.client.util.MessageListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; public class LargeMsgDemoClient { diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index 218c6cd018..bf0889a913 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -5,8 +5,8 @@ import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Queue; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; /** diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 73e71ed84d..34fc057724 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -7,8 +7,8 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; /** diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index e4f19ea6c3..c40a85863d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -3,16 +3,16 @@ package org.apache.qpidity.client.util; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.Header; import org.apache.qpidity.client.MessagePartListener; -/** +/** * This is a simple message assembler. - * Will call onMessage method of the adaptee + * Will call onMessage method of the adaptee * when all message data is read. - * + * * This is a good convinience utility for handling * small messages */ @@ -20,17 +20,17 @@ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; ByteBufferMessage _currentMsg; - + public MessagePartListenerAdapter(MessageListener listener) { - _adaptee = listener; + _adaptee = listener; } - + public void messageTransfer(long transferId) { _currentMsg = new ByteBufferMessage(transferId); } - + public void data(ByteBuffer src) { try @@ -40,28 +40,20 @@ public class MessagePartListenerAdapter implements MessagePartListener catch(IOException e) { // A chance for IO exception - // doesn't occur as we are using + // doesn't occur as we are using // a ByteBuffer } } - public void messageHeaders(Struct... headers) - { - for(Struct struct: headers) - { - if(struct instanceof DeliveryProperties) - { - _currentMsg.setDeliveryProperties((DeliveryProperties)struct); - } - else if (struct instanceof MessageProperties) - { - _currentMsg.setMessageProperties((MessageProperties)struct); - } - } - } - - public void messageReceived() - { + public void messageHeader(Header header) + { + _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class)); + _currentMsg.setMessageProperties(header.get(MessageProperties.class)); + } + + public void messageReceived() + { _adaptee.onMessage(_currentMsg); - } + } + } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java index 2527856798..dfb65214d2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -2,8 +2,8 @@ package org.apache.qpidity.client.util; import java.nio.ByteBuffer; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; public abstract class ReadOnlyMessage implements Message diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java index 23089c7931..37e726d6c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -5,8 +5,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; public class StreamingMessage extends ReadOnlyMessage implements Message diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index a3c03ca6d0..8b6e694d25 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -26,9 +26,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; -import org.apache.qpidity.Option; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; @@ -36,6 +34,8 @@ import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.jms.message.MessageFactory; import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.transport.Option; +import org.apache.qpidity.transport.RangeSet; /** * Implementation of JMS message consumer diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index 9ed74e1cd0..ec837bc4e6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -18,7 +18,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.url.BindingURL; import org.apache.qpidity.exchange.ExchangeDefaults; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 675a51625a..40bf7f15a8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; +import org.apache.qpidity.transport.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index 22feb29598..d23b08e37a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -18,8 +18,8 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.url.BindingURL; import javax.jms.Topic; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java index b84b55966e..dab7585deb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -21,8 +21,10 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import org.apache.qpidity.*; +import org.apache.qpidity.QpidException; import org.apache.qpidity.dtx.XidImpl; +import org.apache.qpidity.transport.*; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index c11a7d8c3b..a7e4d02651 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -29,8 +29,8 @@ import java.io.IOException; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ReplyTo; import org.apache.qpidity.client.util.ByteBufferMessage; +import org.apache.qpidity.transport.ReplyTo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; |
