From 10f23aca8dab83f51cb4fce341dc7fef7dac44b9 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 29 Jul 2008 19:03:34 +0000 Subject: QPID-1072: renamed org.apache.qpidity -> org.apache.qpid git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@680803 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/grammar/SelectorParser.jj | 20 +- java/client/src/main/java/client.log4j | 5 - .../qpid/client/AMQConnectionDelegate_0_10.java | 12 +- .../org/apache/qpid/client/AMQSession_0_10.java | 32 +- .../qpid/client/BasicMessageConsumer_0_10.java | 16 +- .../qpid/client/BasicMessageProducer_0_10.java | 16 +- .../org/apache/qpid/client/XAResourceImpl.java | 26 +- .../java/org/apache/qpid/client/XASessionImpl.java | 8 +- .../qpid/client/message/AbstractJMSMessage.java | 6 +- .../client/message/AbstractJMSMessageFactory.java | 6 +- .../apache/qpid/client/message/MessageFactory.java | 2 +- .../client/message/MessageFactoryRegistry.java | 6 +- .../client/message/UnprocessedMessage_0_10.java | 4 +- .../apache/qpid/filter/ArithmeticExpression.java | 268 ++++++++++ .../org/apache/qpid/filter/BinaryExpression.java | 103 ++++ .../org/apache/qpid/filter/BooleanExpression.java | 33 ++ .../apache/qpid/filter/ComparisonExpression.java | 589 ++++++++++++++++++++ .../org/apache/qpid/filter/ConstantExpression.java | 204 +++++++ .../java/org/apache/qpid/filter/Expression.java | 34 ++ .../org/apache/qpid/filter/JMSSelectorFilter.java | 70 +++ .../org/apache/qpid/filter/LogicExpression.java | 108 ++++ .../java/org/apache/qpid/filter/MessageFilter.java | 27 + .../org/apache/qpid/filter/PropertyExpression.java | 303 +++++++++++ .../org/apache/qpid/filter/UnaryExpression.java | 321 +++++++++++ .../java/org/apache/qpid/jms/BrokerDetails.java | 2 +- .../org/apache/qpid/naming/ReadOnlyContext.java | 509 ++++++++++++++++++ .../java/org/apache/qpid/naming/jndi.properties | 40 ++ .../main/java/org/apache/qpid/nclient/Client.java | 294 ++++++++++ .../org/apache/qpid/nclient/ClosedListener.java | 39 ++ .../java/org/apache/qpid/nclient/Connection.java | 86 +++ .../java/org/apache/qpid/nclient/DtxSession.java | 137 +++++ .../java/org/apache/qpid/nclient/JMSTestCase.java | 115 ++++ .../apache/qpid/nclient/MessagePartListener.java | 63 +++ .../main/java/org/apache/qpid/nclient/Session.java | 595 +++++++++++++++++++++ .../apache/qpid/nclient/impl/ClientSession.java | 206 +++++++ .../qpid/nclient/impl/ClientSessionDelegate.java | 75 +++ .../org/apache/qpid/nclient/impl/Constants.java | 78 +++ .../org/apache/qpid/nclient/impl/DemoClient.java | 94 ++++ .../qpid/nclient/impl/LargeMsgDemoClient.java | 76 +++ .../qpid/nclient/interop/BasicInteropTest.java | 156 ++++++ .../qpid/nclient/util/ByteBufferMessage.java | 160 ++++++ .../org/apache/qpid/nclient/util/FileMessage.java | 96 ++++ .../apache/qpid/nclient/util/MessageListener.java | 34 ++ .../nclient/util/MessagePartListenerAdapter.java | 59 ++ .../apache/qpid/nclient/util/ReadOnlyMessage.java | 38 ++ .../apache/qpid/nclient/util/StreamingMessage.java | 68 +++ .../java/org/apache/qpid/njms/ExceptionHelper.java | 60 +++ .../qpidity/filter/ArithmeticExpression.java | 268 ---------- .../apache/qpidity/filter/BinaryExpression.java | 103 ---- .../apache/qpidity/filter/BooleanExpression.java | 33 -- .../qpidity/filter/ComparisonExpression.java | 589 -------------------- .../apache/qpidity/filter/ConstantExpression.java | 204 ------- .../java/org/apache/qpidity/filter/Expression.java | 34 -- .../apache/qpidity/filter/JMSSelectorFilter.java | 70 --- .../org/apache/qpidity/filter/LogicExpression.java | 108 ---- .../org/apache/qpidity/filter/MessageFilter.java | 27 - .../apache/qpidity/filter/PropertyExpression.java | 303 ----------- .../org/apache/qpidity/filter/UnaryExpression.java | 321 ----------- .../org/apache/qpidity/naming/ReadOnlyContext.java | 509 ------------------ .../java/org/apache/qpidity/naming/jndi.properties | 40 -- .../java/org/apache/qpidity/nclient/Client.java | 294 ---------- .../org/apache/qpidity/nclient/ClosedListener.java | 39 -- .../org/apache/qpidity/nclient/Connection.java | 86 --- .../org/apache/qpidity/nclient/DtxSession.java | 137 ----- .../org/apache/qpidity/nclient/JMSTestCase.java | 115 ---- .../qpidity/nclient/MessagePartListener.java | 63 --- .../java/org/apache/qpidity/nclient/Session.java | 595 --------------------- .../apache/qpidity/nclient/impl/ClientSession.java | 206 ------- .../nclient/impl/ClientSessionDelegate.java | 75 --- .../org/apache/qpidity/nclient/impl/Constants.java | 78 --- .../apache/qpidity/nclient/impl/DemoClient.java | 94 ---- .../qpidity/nclient/impl/LargeMsgDemoClient.java | 76 --- .../qpidity/nclient/interop/BasicInteropTest.java | 156 ------ .../qpidity/nclient/util/ByteBufferMessage.java | 160 ------ .../apache/qpidity/nclient/util/FileMessage.java | 96 ---- .../qpidity/nclient/util/MessageListener.java | 34 -- .../nclient/util/MessagePartListenerAdapter.java | 59 -- .../qpidity/nclient/util/ReadOnlyMessage.java | 38 -- .../qpidity/nclient/util/StreamingMessage.java | 68 --- .../org/apache/qpidity/njms/ExceptionHelper.java | 60 --- 80 files changed, 5216 insertions(+), 5221 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/BooleanExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/ConstantExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/Expression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/filter/UnaryExpression.java create mode 100644 java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java create mode 100644 java/client/src/main/java/org/apache/qpid/naming/jndi.properties create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Client.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Connection.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/JMSTestCase.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Session.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/Constants.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/FileMessage.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/MessageListener.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/ReadOnlyMessage.java create mode 100644 java/client/src/main/java/org/apache/qpid/nclient/util/StreamingMessage.java create mode 100644 java/client/src/main/java/org/apache/qpid/njms/ExceptionHelper.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/BinaryExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/BooleanExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/ConstantExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/Expression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/LogicExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/MessageFilter.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/PropertyExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/filter/UnaryExpression.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/naming/jndi.properties delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/Client.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/Connection.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/Session.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java (limited to 'java/client/src') diff --git a/java/client/src/main/grammar/SelectorParser.jj b/java/client/src/main/grammar/SelectorParser.jj index a72da526ae..b45cf1a487 100644 --- a/java/client/src/main/grammar/SelectorParser.jj +++ b/java/client/src/main/grammar/SelectorParser.jj @@ -61,20 +61,20 @@ PARSER_BEGIN(SelectorParser) * */ -package org.apache.qpidity.filter.selector; +package org.apache.qpid.filter.selector; import java.io.StringReader; import java.util.ArrayList; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.filter.ArithmeticExpression; -import org.apache.qpidity.filter.BooleanExpression; -import org.apache.qpidity.filter.ComparisonExpression; -import org.apache.qpidity.filter.ConstantExpression; -import org.apache.qpidity.filter.Expression; -import org.apache.qpidity.filter.LogicExpression; -import org.apache.qpidity.filter.PropertyExpression; -import org.apache.qpidity.filter.UnaryExpression; +import org.apache.qpid.QpidException; +import org.apache.qpid.filter.ArithmeticExpression; +import org.apache.qpid.filter.BooleanExpression; +import org.apache.qpid.filter.ComparisonExpression; +import org.apache.qpid.filter.ConstantExpression; +import org.apache.qpid.filter.Expression; +import org.apache.qpid.filter.LogicExpression; +import org.apache.qpid.filter.PropertyExpression; +import org.apache.qpid.filter.UnaryExpression; /** * JMS Selector Parser generated by JavaCC diff --git a/java/client/src/main/java/client.log4j b/java/client/src/main/java/client.log4j index a5531bb874..19cc946118 100644 --- a/java/client/src/main/java/client.log4j +++ b/java/client/src/main/java/client.log4j @@ -21,15 +21,10 @@ log4j.rootLogger=${root.logging.level} #log4j.logger.org.apache.qpid=${amqj.logging.level}, console #log4j.additivity.org.apache.qpid=false -#log4j.logger.org.apache.qpidity.transport=TRACE, console log4j.logger.org.apache.qpid=ERROR, console log4j.additivity.org.apache.qpid=false -log4j.logger.org.apache.qpidity=ERROR, console -log4j.additivity.org.apache.qpidity=false - -#log4j.logger.org.apache.qpidity.transport=DEBUG, console #log4j.logger.org.apache.qpid.client.message.AbstractBytesTypedMessage=DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index ce10553210..a2df2f3cf2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -12,11 +12,11 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.ClosedListener; -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.transport.ProtocolVersionException; +import org.apache.qpid.nclient.Client; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.transport.ProtocolVersionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed /** * The QpidConeection instance that is mapped with thie JMS connection. */ - org.apache.qpidity.nclient.Connection _qpidConnection; + org.apache.qpid.nclient.Connection _qpidConnection; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 9dff8e3e7e..cf73bc5089 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -28,16 +28,16 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; import org.apache.qpid.util.Serial; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.transport.MessageCreditUnit; -import org.apache.qpidity.transport.MessageFlowMode; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.ExchangeBoundResult; -import org.apache.qpidity.transport.Future; +import org.apache.qpid.nclient.Session; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +71,8 @@ public class AMQSession_0_10 extends AMQSession private Object _currentExceptionLock = new Object(); private QpidException _currentException; - // a ref on the qpidity connection - protected org.apache.qpidity.nclient.Connection _qpidConnection; + // a ref on the qpid connection + protected org.apache.qpid.nclient.Connection _qpidConnection; private RangeSet unacked = new RangeSet(); private int unackedCount = 0; @@ -96,7 +96,7 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. * @param qpidConnection The qpid connection */ - AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -126,7 +126,7 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. * @param qpidConnection The connection */ - AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -594,7 +594,7 @@ public class AMQSession_0_10 extends AMQSession * * @return The associated Qpid Session. */ - protected org.apache.qpidity.nclient.Session getQpidSession() + protected org.apache.qpid.nclient.Session getQpidSession() { return _qpidSession; } @@ -645,7 +645,7 @@ public class AMQSession_0_10 extends AMQSession /** * Lstener for qpid protocol exceptions */ - private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener + private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener { public void onClosed(ErrorCode errorCode, String reason, Throwable t) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9230225bd5..255b38aa10 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -27,12 +27,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.jms.*; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.transport.*; -import org.apache.qpidity.transport.Session; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpid.api.Message; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.Session; +import org.apache.qpid.QpidException; +import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * This is a 0.10 message consumer. */ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer - implements org.apache.qpidity.nclient.util.MessageListener + implements org.apache.qpid.nclient.util.MessageListener { /** @@ -108,7 +108,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer')); + ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('=')); + ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('!')); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression + { + + Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) + { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) + { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) + { + i++; + if (i >= like.length()) + { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') + { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') + { + regexp.append("."); // match one + } + else if (ComparisonExpression.REGEXP_CONTROL_CHARS.contains(new Character(c))) + { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else + { + regexp.append(c); + } + } + + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() + { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) + { + return null; + } + + if (!(rv instanceof String)) + { + return + Boolean.FALSE; + // throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(AbstractJMSMessage message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) + { + if ((escape != null) && (escape.length() != 1)) + { + throw new RuntimeException( + "The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); + } + + int c = -1; + if (escape != null) + { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) + { + return UnaryExpression.createNOT(ComparisonExpression.createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) + { + return ComparisonExpression.doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) + { + return UnaryExpression.createNOT(ComparisonExpression.doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) + { + return UnaryExpression.createNOT(ComparisonExpression.createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) + { + ComparisonExpression.checkEqualOperand(left); + ComparisonExpression.checkEqualOperand(right); + ComparisonExpression.checkEqualOperandCompatability(left, right); + + return ComparisonExpression.doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) + { + return new ComparisonExpression(left, right) + { + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + Object lv = left.evaluate(message); + Object rv = right.evaluate(message); + + // Iff one of the values is null + if ((lv == null) ^ (rv == null)) + { + return Boolean.FALSE; + } + + if ((lv == rv) || lv.equals(rv)) + { + return Boolean.TRUE; + } + + if ((lv instanceof Comparable) && (rv instanceof Comparable)) + { + return compare((Comparable) lv, (Comparable) rv); + } + + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) + { + return answer == 0; + } + + public String getExpressionSymbol() + { + return "="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) + { + ComparisonExpression.checkLessThanOperand(left); + ComparisonExpression.checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer > 0; + } + + public String getExpressionSymbol() + { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) + { + ComparisonExpression.checkLessThanOperand(left); + ComparisonExpression.checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer >= 0; + } + + public String getExpressionSymbol() + { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) + { + ComparisonExpression.checkLessThanOperand(left); + ComparisonExpression.checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer < 0; + } + + public String getExpressionSymbol() + { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) + { + ComparisonExpression.checkLessThanOperand(left); + ComparisonExpression.checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer <= 0; + } + + public String getExpressionSymbol() + { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value instanceof Number) + { + return; + } + + // Else it's boolean or a String.. + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + + if (expr instanceof BooleanExpression) + { + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value == null) + { + throw new RuntimeException("'" + expr + "' cannot be compared."); + } + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) + { + if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression)) + { + if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression)) + { + throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'"); + } + } + } + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) + { + super(left, right); + } + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + Comparable lv = (Comparable) left.evaluate(message); + if (lv == null) + { + return null; + } + + Comparable rv = (Comparable) right.evaluate(message); + if (rv == null) + { + return null; + } + + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) + { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) + { + if (lc == Byte.class) + { + if (rc == Short.class) + { + lv = new Short(((Number) lv).shortValue()); + } + else if (rc == Integer.class) + { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Short.class) + { + if (rc == Integer.class) + { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Integer.class) + { + if (rc == Long.class) + { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Long.class) + { + if (rc == Integer.class) + { + rv = new Long(((Number) rv).longValue()); + } + else if (rc == Float.class) + { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Float.class) + { + if (rc == Integer.class) + { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Long.class) + { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Double.class) + { + lv = new Double(((Number) lv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Double.class) + { + if (rc == Integer.class) + { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Long.class) + { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Float.class) + { + rv = new Float(((Number) rv).doubleValue()); + } + else + { + return Boolean.FALSE; + } + } + else + { + return Boolean.FALSE; + } + } + + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(AbstractJMSMessage message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/ConstantExpression.java b/java/client/src/main/java/org/apache/qpid/filter/ConstantExpression.java new file mode 100644 index 0000000000..447de914a4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/ConstantExpression.java @@ -0,0 +1,204 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; + +import java.math.BigDecimal; + +/** + * Represents a constant expression + */ +public class ConstantExpression implements Expression +{ + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression + { + public BooleanConstantExpression(Object value) + { + super(value); + } + + public boolean matches(AbstractJMSMessage message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) + { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) + { + text = text.substring(0, text.length() - 1); + } + + Number value; + try + { + value = new Long(text); + } + catch (NumberFormatException e) + { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) + { + Number value = new Long(Long.parseLong(text.substring(2), 16)); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) + { + Number value = new Long(Long.parseLong(text, 8)); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = new Integer(value.intValue()); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) + { + Number value = new Double(text); + + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) + { + this.value = value; + } + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + return value; + } + + public Object getValue() + { + return value; + } + + /** + * @see Object#toString() + */ + public String toString() + { + if (value == null) + { + return "NULL"; + } + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + + if (value instanceof String) + { + return ConstantExpression.encodeString((String) value); + } + + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see Object#equals(Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) + { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) + { + char c = s.charAt(i); + if (c == '\'') + { + b.append(c); + } + + b.append(c); + } + + b.append('\''); + + return b.toString(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/Expression.java b/java/client/src/main/java/org/apache/qpid/filter/Expression.java new file mode 100644 index 0000000000..e578775a77 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/Expression.java @@ -0,0 +1,34 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; + + +/** + * Represents an expression + */ +public interface Expression +{ + /** + * @param message The message to evaluate + * @return the value of this expression + */ + public Object evaluate(AbstractJMSMessage message) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java new file mode 100644 index 0000000000..dcfb9a9940 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java @@ -0,0 +1,70 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.QpidException; +import org.apache.qpid.filter.selector.SelectorParser; +import org.apache.qpid.client.message.AbstractJMSMessage; + + +public class JMSSelectorFilter implements MessageFilter +{ + /** + * this JMSSelectorFilter's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); + + private String _selector; + private BooleanExpression _matcher; + + public JMSSelectorFilter(String selector) throws QpidException + { + _selector = selector; + if (JMSSelectorFilter._logger.isDebugEnabled()) + { + JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector); + } + _matcher = new SelectorParser().parse(selector); + } + + public boolean matches(AbstractJMSMessage message) + { + try + { + boolean match = _matcher.matches(message); + if (JMSSelectorFilter._logger.isDebugEnabled()) + { + JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System + .identityHashCode(_selector) + "):" + _selector); + } + return match; + } + catch (QpidException e) + { + JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e); + } + return false; + } + + public String getSelector() + { + return _selector; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java new file mode 100644 index 0000000000..d7aabd5a46 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java @@ -0,0 +1,108 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; + + +/** + * A filter performing a comparison of two objects + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression +{ + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new LogicExpression(lvalue, rvalue) + { + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + + Boolean lv = (Boolean) left.evaluate(message); + // Can we do an OR shortcut?? + if ((lv != null) && lv.booleanValue()) + { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) right.evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "OR"; + } + }; + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new LogicExpression(lvalue, rvalue) + { + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + + Boolean lv = (Boolean) left.evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + { + return null; + } + + if (!lv.booleanValue()) + { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) right.evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "AND"; + } + }; + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) + { + super(left, right); + } + + public abstract Object evaluate(AbstractJMSMessage message) throws QpidException; + + public boolean matches(AbstractJMSMessage message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java new file mode 100644 index 0000000000..a775080d81 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java @@ -0,0 +1,27 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; + + +public interface MessageFilter +{ + boolean matches(AbstractJMSMessage message) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java new file mode 100644 index 0000000000..700c6a15ac --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java @@ -0,0 +1,303 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.QpidException; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import javax.jms.JMSException; +import java.util.HashMap; + +/** + * Represents a property expression + */ +public class PropertyExpression implements Expression +{ + // Constants - defined the same as JMS + private static final int NON_PERSISTENT = 1; + private static final int DEFAULT_PRIORITY = 4; + + private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); + + private static final HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + //TODO + return null; + } + }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString replyTo = _properties.getReplyTo(); + + return (replyTo == null) ? null : replyTo.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property", e); + + return null; + } + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString type = _properties.getType(); + + return (type == null) ? null : type.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property", e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + int mode = message.getJMSDeliveryMode(); + if (_logger.isDebugEnabled()) + { + _logger.debug("JMSDeliveryMode is :" + mode); + } + + return mode; + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + } + + return NON_PERSISTENT; + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return (int) _properties.getPriority(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + } + + return DEFAULT_PRIORITY; + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString messageId = _properties.getMessageId(); + + return (messageId == null) ? null : messageId; + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return _properties.getTimestamp(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString correlationId = _properties.getCorrelationId(); + return (correlationId == null) ? null : correlationId.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return _properties.getExpiration(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + return message.getJMSRedelivered(); + } + catch (JMSException e) + { + _logger.warn("Error evaluating property",e); + return null; + } + } + }); + + } + + private final String name; + private final Expression jmsPropertyExpression; + + public PropertyExpression(String name) + { + this.name = name; + jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); + } + + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } + else + { + + CommonContentHeaderProperties _properties = message.getContentHeaderProperties(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Looking up property:" + name); + _logger.debug("Properties are:" + _properties.getHeaders().keySet()); + } + return _properties.getHeaders().getObject(name); + } + } + + public String getName() + { + return name; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + return name.equals(((PropertyExpression) o).name); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/filter/UnaryExpression.java b/java/client/src/main/java/org/apache/qpid/filter/UnaryExpression.java new file mode 100644 index 0000000000..b620b107c4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/filter/UnaryExpression.java @@ -0,0 +1,321 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.filter; + +import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; + +/** + * An expression which performs an operation on two expression values + */ +public abstract class UnaryExpression implements Expression +{ + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + protected Expression right; + + public static Expression createNegate(Expression left) + { + return new UnaryExpression(left) + { + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue instanceof Number) + { + return UnaryExpression.negate((Number) rvalue); + } + + return null; + } + + public String getExpressionSymbol() + { + return "-"; + } + }; + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) + { + + // Use a HashSet if there are many elements. + Collection t; + if (elements.size() == 0) + { + t = null; + } + else if (elements.size() < 5) + { + t = elements; + } + else + { + t = new HashSet(elements); + } + + final Collection inList = t; + + return new BooleanUnaryExpression(right) + { + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue.getClass() != String.class) + { + return null; + } + + if (((inList != null) && inList.contains(rvalue)) ^ not) + { + return Boolean.TRUE; + } + else + { + return Boolean.FALSE; + } + + } + + public String toString() + { + StringBuffer answer = new StringBuffer(); + answer.append(right); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count = 0; + for (Iterator i = inList.iterator(); i.hasNext();) + { + Object o = (Object) i.next(); + if (count != 0) + { + answer.append(", "); + } + + answer.append(o); + count++; + } + + answer.append(" )"); + + return answer.toString(); + } + + public String getExpressionSymbol() + { + if (not) + { + return "NOT IN"; + } + else + { + return "IN"; + } + } + }; + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression + { + public BooleanUnaryExpression(Expression left) + { + super(left); + } + + public boolean matches(AbstractJMSMessage message) throws QpidException + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + ; + + public static BooleanExpression createNOT(BooleanExpression left) + { + return new BooleanUnaryExpression(left) + { + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + Boolean lvalue = (Boolean) right.evaluate(message); + if (lvalue == null) + { + return null; + } + + return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() + { + return "NOT"; + } + }; + } + public static BooleanExpression createBooleanCast(Expression left) + { + return new BooleanUnaryExpression(left) + { + public Object evaluate(AbstractJMSMessage message) throws QpidException + { + Object rvalue = right.evaluate(message); + if (rvalue == null) + { + return null; + } + + if (!rvalue.getClass().equals(Boolean.class)) + { + return Boolean.FALSE; + } + + return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() + { + return right.toString(); + } + + public String getExpressionSymbol() + { + return ""; + } + }; + } + + private static Number negate(Number left) + { + Class clazz = left.getClass(); + if (clazz == Integer.class) + { + return new Integer(-left.intValue()); + } + else if (clazz == Long.class) + { + return new Long(-left.longValue()); + } + else if (clazz == Float.class) + { + return new Float(-left.floatValue()); + } + else if (clazz == Double.class) + { + return new Double(-left.doubleValue()); + } + else if (clazz == BigDecimal.class) + { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal) left; + bd = bd.negate(); + + if (UnaryExpression.BD_LONG_MIN_VALUE.compareTo(bd) == 0) + { + return new Long(Long.MIN_VALUE); + } + + return bd; + } + else + { + throw new RuntimeException("Don't know how to negate: " + left); + } + } + + public UnaryExpression(Expression left) + { + this.right = left; + } + + public Expression getRight() + { + return right; + } + + public void setRight(Expression expression) + { + right = expression; + } + + /** + * @see Object#toString() + */ + public String toString() + { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see Object#equals(Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + public abstract String getExpressionSymbol(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 72ba16086d..0316255b2c 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -48,7 +48,7 @@ public interface BrokerDetails public static final long DEFAULT_CONNECT_TIMEOUT = 30000L; public static final boolean USE_SSL_DEFAULT = false; - // pulled these properties from the new BrokerDetails class in the qpidity package + // pulled these properties from the new BrokerDetails class in the qpid package public static final String PROTOCOL_TCP = "tcp"; public static final String PROTOCOL_TLS = "tls"; diff --git a/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java b/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java new file mode 100644 index 0000000000..59ec4cfba7 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java @@ -0,0 +1,509 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.naming; + +import org.apache.qpid.jndi.NameParserImpl; + +import javax.naming.*; +import javax.naming.spi.NamingManager; +import java.io.Serializable; +import java.util.*; + +/** + * Based on class from ActiveMQ. + * A read-only Context + *

+ * This version assumes it and all its subcontext are read-only and any attempt + * to modify (e.g. through bind) will result in an OperationNotSupportedException. + * Each Context in the tree builds a cache of the entries in all sub-contexts + * to optimise the performance of lookup. + *

+ *

This implementation is intended to optimise the performance of lookup(String) + * to about the level of a HashMap get. It has been observed that the scheme + * resolution phase performed by the JVM takes considerably longer, so for + * optimum performance lookups should be coded like:

+ * + * Context componentContext = (Context)new InitialContext().lookup("java:comp"); + * String envEntry = (String) componentContext.lookup("env/myEntry"); + * String envEntry2 = (String) componentContext.lookup("env/myEntry2"); + * + */ +public class ReadOnlyContext implements Context, Serializable +{ + private static final long serialVersionUID = -5754338187296859149L; + protected static final NameParser nameParser = new NameParserImpl(); + + protected final Hashtable environment; // environment for this context + protected final Map bindings; // bindings at my level + protected final Map treeBindings; // all bindings under me + + private boolean frozen = false; + private String nameInNamespace = ""; + public static final String SEPARATOR = "/"; + + public ReadOnlyContext() + { + environment = new Hashtable(); + bindings = new HashMap(); + treeBindings = new HashMap(); + } + + public ReadOnlyContext(Hashtable env) + { + if (env == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(env); + } + + this.bindings = Collections.EMPTY_MAP; + this.treeBindings = Collections.EMPTY_MAP; + } + + public ReadOnlyContext(Hashtable environment, Map bindings) + { + if (environment == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(environment); + } + + this.bindings = bindings; + treeBindings = new HashMap(); + frozen = true; + } + + public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace) + { + this(environment, bindings); + this.nameInNamespace = nameInNamespace; + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env) + { + this.bindings = clone.bindings; + this.treeBindings = clone.treeBindings; + this.environment = new Hashtable(env); + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace) + { + this(clone, env); + this.nameInNamespace = nameInNamespace; + } + + public void freeze() + { + frozen = true; + } + + boolean isFrozen() + { + return frozen; + } + + /** + * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses. + * It binds every possible lookup into a map in each context. To do this, each context + * strips off one name segment and if necessary creates a new context for it. Then it asks that context + * to bind the remaining name. It returns a map containing all the bindings from the next context, plus + * the context it just created (if it in fact created it). (the names are suitably extended by the segment + * originally lopped off). + * + * @param name + * @param value + * @return + * @throws javax.naming.NamingException + */ + protected Map internalBind(String name, Object value) throws NamingException + { + assert (name != null) && (name.length() > 0); + assert !frozen; + + Map newBindings = new HashMap(); + int pos = name.indexOf('/'); + if (pos == -1) + { + if (treeBindings.put(name, value) != null) + { + throw new NamingException("Something already bound at " + name); + } + + bindings.put(name, value); + newBindings.put(name, value); + } + else + { + String segment = name.substring(0, pos); + assert segment != null; + assert !segment.equals(""); + Object o = treeBindings.get(segment); + if (o == null) + { + o = newContext(); + treeBindings.put(segment, o); + bindings.put(segment, o); + newBindings.put(segment, o); + } + else if (!(o instanceof ReadOnlyContext)) + { + throw new NamingException("Something already bound where a subcontext should go"); + } + + ReadOnlyContext readOnlyContext = (ReadOnlyContext) o; + String remainder = name.substring(pos + 1); + Map subBindings = readOnlyContext.internalBind(remainder, value); + for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();) + { + Map.Entry entry = (Map.Entry) iterator.next(); + String subName = segment + "/" + (String) entry.getKey(); + Object bound = entry.getValue(); + treeBindings.put(subName, bound); + newBindings.put(subName, bound); + } + } + + return newBindings; + } + + protected ReadOnlyContext newContext() + { + return new ReadOnlyContext(); + } + + public Object addToEnvironment(String propName, Object propVal) throws NamingException + { + return environment.put(propName, propVal); + } + + public Hashtable getEnvironment() throws NamingException + { + return (Hashtable) environment.clone(); + } + + public Object removeFromEnvironment(String propName) throws NamingException + { + return environment.remove(propName); + } + + public Object lookup(String name) throws NamingException + { + if (name.length() == 0) + { + return this; + } + + Object result = treeBindings.get(name); + if (result == null) + { + result = bindings.get(name); + } + + if (result == null) + { + int pos = name.indexOf(':'); + if (pos > 0) + { + String scheme = name.substring(0, pos); + Context ctx = NamingManager.getURLContext(scheme, environment); + if (ctx == null) + { + throw new NamingException("scheme " + scheme + " not recognized"); + } + + return ctx.lookup(name); + } + else + { + // Split out the first name of the path + // and look for it in the bindings map. + CompositeName path = new CompositeName(name); + + if (path.size() == 0) + { + return this; + } + else + { + String first = path.get(0); + Object obj = bindings.get(first); + if (obj == null) + { + throw new NameNotFoundException(name); + } + else if ((obj instanceof Context) && (path.size() > 1)) + { + Context subContext = (Context) obj; + obj = subContext.lookup(path.getSuffix(1)); + } + + return obj; + } + } + } + + if (result instanceof LinkRef) + { + LinkRef ref = (LinkRef) result; + result = lookup(ref.getLinkName()); + } + + if (result instanceof Reference) + { + try + { + result = NamingManager.getObjectInstance(result, null, null, this.environment); + } + catch (NamingException e) + { + throw e; + } + catch (Exception e) + { + throw (NamingException) new NamingException("could not look up : " + name).initCause(e); + } + } + + if (result instanceof ReadOnlyContext) + { + String prefix = getNameInNamespace(); + if (prefix.length() > 0) + { + prefix = prefix + SEPARATOR; + } + + result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name); + } + + return result; + } + + public Object lookup(Name name) throws NamingException + { + return lookup(name.toString()); + } + + public Object lookupLink(String name) throws NamingException + { + return lookup(name); + } + + public Name composeName(Name name, Name prefix) throws NamingException + { + Name result = (Name) prefix.clone(); + result.addAll(name); + + return result; + } + + public String composeName(String name, String prefix) throws NamingException + { + CompositeName result = new CompositeName(prefix); + result.addAll(new CompositeName(name)); + + return result.toString(); + } + + public NamingEnumeration list(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ReadOnlyContext.ListEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).list(""); + } + else + { + throw new NotContextException(); + } + } + + public NamingEnumeration listBindings(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ReadOnlyContext.ListBindingEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).listBindings(""); + } + else + { + throw new NotContextException(); + } + } + + public Object lookupLink(Name name) throws NamingException + { + return lookupLink(name.toString()); + } + + public NamingEnumeration list(Name name) throws NamingException + { + return list(name.toString()); + } + + public NamingEnumeration listBindings(Name name) throws NamingException + { + return listBindings(name.toString()); + } + + public void bind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void bind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void close() throws NamingException + { + // ignore + } + + public Context createSubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public Context createSubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public String getNameInNamespace() throws NamingException + { + return nameInNamespace; + } + + public NameParser getNameParser(Name name) throws NamingException + { + return nameParser; + } + + public NameParser getNameParser(String name) throws NamingException + { + return nameParser; + } + + public void rebind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rebind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(Name oldName, Name newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(String oldName, String newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + private abstract class LocalNamingEnumeration implements NamingEnumeration + { + private Iterator i = bindings.entrySet().iterator(); + + public boolean hasMore() throws NamingException + { + return i.hasNext(); + } + + public boolean hasMoreElements() + { + return i.hasNext(); + } + + protected Map.Entry getNext() + { + return (Map.Entry) i.next(); + } + + public void close() throws NamingException + { } + } + + private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + + return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName()); + } + } + + private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + + return new Binding((String) entry.getKey(), entry.getValue()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/naming/jndi.properties b/java/client/src/main/java/org/apache/qpid/naming/jndi.properties new file mode 100644 index 0000000000..830de5f619 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/naming/jndi.properties @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.naming.PropertiesFileInitialConextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +# qpid:username=foo;password=password;client_id=id;virtualhost=path@tpc:localhost:1556 +connectionfactory.local = qpid:tcp:localhost' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java new file mode 100644 index 0000000000..bed3ee02cb --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/Client.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.nclient; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.client.url.URLParser_0_10; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.url.QpidURL; +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.nclient.impl.ClientSession; +import org.apache.qpid.nclient.impl.ClientSessionDelegate; +import org.apache.qpid.transport.Channel; +import org.apache.qpid.transport.ClientDelegate; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.ConnectionCloseOk; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.network.mina.MinaHandler; +import org.apache.qpid.transport.network.nio.NioHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Client implements org.apache.qpid.nclient.Connection +{ + private Connection _conn; + private ClosedListener _closedListner; + private final Lock _lock = new ReentrantLock(); + private static Logger _logger = LoggerFactory.getLogger(Client.class); + private Condition closeOk; + private boolean closed = false; + private long timeout = 60000; + + private ProtocolHeader header = null; + + /** + * + * @return returns a new connection to the broker. + */ + public static org.apache.qpid.nclient.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + + final Condition negotiationComplete = _lock.newCondition(); + closeOk = _lock.newCondition(); + _lock.lock(); + + ClientDelegate connectionDelegate = new ClientDelegate() + { + private boolean receivedClose = false; + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + public void exception(Throwable t) + { + if (_closedListner != null) + { + _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); + } + else + { + throw new RuntimeException("connection closed",t); + } + } + + public void closed() + { + if (_closedListner != null && !this.receivedClose) + { + _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); + } + } + + @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) + { + _lock.lock(); + try + { + closed = true; + this.receivedClose = true; + closeOk.signalAll(); + } + finally + { + _lock.unlock(); + } + } + + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) + { + ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); + if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) + { + throw new RuntimeException + (new QpidException("Server closed the connection: Reason " + + connectionClose.getReplyText(), + errorCode, + null)); + } + else + { + _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); + } + + this.receivedClose = true; + } + @Override public void init(Channel ch, ProtocolHeader hdr) + { + // TODO: once the merge is done we'll need to update this code + // for handling 0.8 protocol version type i.e. major=8 and mino + if (hdr.getMajor() != 0 || hdr.getMinor() != 10) + { + Client.this.header = hdr; + _lock.lock(); + negotiationComplete.signalAll(); + _lock.unlock(); + } + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + String transport = System.getProperty("transport","io"); + if (transport.equalsIgnoreCase("nio")) + { + _logger.info("using NIO Transport"); + _conn = NioHandler.connect(host, port,connectionDelegate); + } + else if (transport.equalsIgnoreCase("io")) + { + _logger.info("using Plain IO Transport"); + _conn = IoTransport.connect(host, port,connectionDelegate); + } + else + { + _logger.info("using MINA Transport"); + _conn = MinaHandler.connect(host, port,connectionDelegate); + // _conn = NativeHandler.connect(host, port,connectionDelegate); + } + + // XXX: hardcoded version numbers + _conn.send(new ProtocolHeader(1, 0, 10)); + + try + { + negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); + if (header != null) + { + _conn.close(); + throw new ProtocolVersionException(header.getMajor(), header.getMinor()); + } + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + finally + { + _lock.unlock(); + } + } + + public void connect(String url)throws QpidException + { + URLParser_0_10 parser = null; + try + { + parser = new URLParser_0_10(url); + } + catch(Exception e) + { + throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); + } + List brokers = parser.getAllBrokerDetails(); + BrokerDetails brokerDetail = brokers.get(0); + connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), + brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), + brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(QpidURL url) throws QpidException + { + throw new UnsupportedOperationException("Not implemented"); + } + + /* { + // temp impl to tests + BrokerDetails details = url.getAllBrokerDetails().get(0); + connect(details.getHost(), + details.getPort(), + details.getVirtualHost(), + details.getUserName(), + details.getPassword()); + } +*/ + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); + _lock.lock(); + try + { + try + { + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed && elapsed < timeout) + { + closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); + elapsed = System.currentTimeMillis() - start; + } + if(!closed) + { + throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); + } + } + catch (InterruptedException e) + { + throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); + } + } + finally + { + _lock.unlock(); + } + _conn.close(); + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(); + ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); + ssn.attach(ch); + ssn.sessionAttach(ssn.getName()); + ssn.sessionRequestTimeout(expiryInSeconds); + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); + clientSession.dtxSelect(); + return (DtxSession) clientSession; + } + + public void setClosedListener(ClosedListener closedListner) + { + + _closedListner = closedListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java new file mode 100644 index 0000000000..4cf0cab1ec --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpid.ErrorCode; + + +/** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + */ +public interface ClosedListener +{ + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * @param errorCode TODO + * @param reason TODO + * @param t TODO + * @see Connection + */ + public void onClosed(ErrorCode errorCode, String reason, Throwable t); +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java new file mode 100644 index 0000000000..2d5b50b33a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpid.QpidException; + +/** + * This represents a physical connection to a broker. + */ +public interface Connection +{ + /** + * Establish the connection using the given parameters + * + * @param host host name + * @param port port number + * @param virtualHost the virtual host name + * @param username user name + * @param password password + * @throws QpidException If the communication layer fails to establish the connection. + */ + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; + + /** + * Establish the connection with the broker identified by the URL. + * + * @param url Specifies the URL of the broker. + * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown. + */ + public void connect(String url) throws QpidException; + + /** + * Close this connection. + * + * @throws QpidException if the communication layer fails to close the connection. + */ + public void close() throws QpidException; + + /** + * Create a session for this connection. + *

The returned session is suspended + * (i.e. this session is not attached to an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than + * or equal to 0 then the session does not expire. + * @return A newly created (suspended) session. + */ + public Session createSession(long expiryInSeconds); + + /** + * Create a DtxSession for this connection. + *

A Dtx Session must be used when resources have to be manipulated as + * part of a global transaction. + *

The retuned DtxSession is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal + * to 0 then the session does not expire. + * @return A newly created (suspended) DtxSession. + */ + public DtxSession createDTXSession(int expiryInSeconds); + + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ClosedListener + * + * @param exceptionListner The ClosedListener + */ + public void setClosedListener(ClosedListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java new file mode 100644 index 0000000000..8a859f2d84 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.GetTimeoutResult; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.RecoverResult; +import org.apache.qpid.transport.XaResult; +import org.apache.qpid.transport.Xid; + +/** + * The resources for this session are controlled under the scope of a distributed transaction. + */ +public interface DtxSession extends Session +{ + + /** + * This method is called when messages should be produced and consumed on behalf a transaction + * branch identified by xid. + * possible options are: + *

    + *
  • {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. + *
  • {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. + *
+ * + * @param xid Specifies the xid of the transaction branch to be started. + * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. + * @return Confirms to the client that the transaction branch is started or specify the error condition. + */ + public Future dtxStart(Xid xid, Option... options); + + /** + * This method is called when the work done on behalf of a transaction branch finishes or needs to + * be suspended. + * possible options are: + *
    + *
  • {@link Option#FAIL}: indicates that this portion of work has failed; + * otherwise this portion of work has + * completed successfully. + *
  • {@link Option#SUSPEND}: Indicates that the transaction branch is + * temporarily suspended in an incomplete state. + *
+ * + * @param xid Specifies the xid of the transaction branch to be ended. + * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. + * @return Confirms to the client that the transaction branch is ended or specifies the error condition. + */ + public Future dtxEnd(Xid xid, Option... options); + + /** + * Commit the work done on behalf of a transaction branch. This method commits the work associated + * with xid. Any produced messages are made available and any consumed messages are discarded. + * The only possible option is: + *
    + *
  • {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used. + *
+ * + * @param xid Specifies the xid of the transaction branch to be committed. + * @param options Available option is: {@link Option#ONE_PHASE} + * @return Confirms to the client that the transaction branch is committed or specifies the error condition. + */ + public Future dtxCommit(Xid xid, Option... options); + + /** + * This method is called to forget about a heuristically completed transaction branch. + * + * @param xid Specifies the xid of the transaction branch to be forgotten. + */ + public void dtxForget(Xid xid, Option ... options); + + /** + * This method obtains the current transaction timeout value in seconds. If set-timeout was not + * used prior to invoking this method, the return value is the default timeout value; otherwise, the + * value used in the previous set-timeout call is returned. + * + * @param xid Specifies the xid of the transaction branch used for getting the timeout. + * @return The current transaction timeout value in seconds. + */ + public Future dtxGetTimeout(Xid xid, Option ... options); + + /** + * This method prepares any message produced or consumed on behalf of xid, ready for commitment. + * + * @param xid Specifies the xid of the transaction branch to be prepared. + * @return The status of the prepare operation can be any one of: + * xa-ok: Normal execution. + *

+ * xa-rdonly: The transaction branch was read-only and has been committed. + *

+ * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified + * reason. + *

+ * xa-rbtimeout: The work represented by this transaction branch took too long. + */ + public Future dtxPrepare(Xid xid, Option ... options); + + /** + * This method is called to obtain a list of transaction branches that are in a prepared or + * heuristically completed state. + * @return a array of xids to be recovered. + */ + public Future dtxRecover(Option ... options); + + /** + * This method rolls back the work associated with xid. Any produced messages are discarded and + * any consumed messages are re-queued. + * + * @param xid Specifies the xid of the transaction branch to be rolled back. + * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. + */ + public Future dtxRollback(Xid xid, Option ... options); + + /** + * Sets the specified transaction branch timeout value in seconds. + * + * @param xid Specifies the xid of the transaction branch for setting the timeout. + * @param timeout The transaction timeout value in seconds. + */ + public void dtxSetTimeout(Xid xid, long timeout, Option ... options); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/JMSTestCase.java b/java/client/src/main/java/org/apache/qpid/nclient/JMSTestCase.java new file mode 100644 index 0000000000..4e1b9058e6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/JMSTestCase.java @@ -0,0 +1,115 @@ + package org.apache.qpid.nclient; + +import java.util.Enumeration; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueBrowser; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; + +public class JMSTestCase +{ + + public static void main(String[] args) + { + + try + { + javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672"); + con.start(); + + javax.jms.Session ssn = con.createSession(false, 1); + + javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + QueueBrowser browser = ssn.createBrowser((Queue)dest, "Test = 'test'"); + + javax.jms.TextMessage msg = ssn.createTextMessage(); + msg.setStringProperty("TEST", "test"); + msg.setText("Should get this"); + prod.send(msg); + + javax.jms.TextMessage msg2 = ssn.createTextMessage(); + msg2.setStringProperty("TEST", "test2"); + msg2.setText("Shouldn't get this"); + prod.send(msg2); + + + Enumeration enu = browser.getEnumeration(); + for (;enu.hasMoreElements();) + { + System.out.println(enu.nextElement()); + System.out.println("\n"); + } + + javax.jms.MessageConsumer cons = ssn.createConsumer(dest, "Test = 'test'"); + javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive(); + cons.setMessageListener(new MessageListener() + { + public void onMessage(Message m) + { + javax.jms.TextMessage m2 = (javax.jms.TextMessage)m; + try + { + System.out.println("headers : " + m2.toString()); + System.out.println("m : " + m2.getText()); + System.out.println("\n\n"); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + + }); + + con.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + + System.out.println("Waiting"); + while (m == null) + { + + } + + System.out.println("Exiting"); + + /*javax.jms.TextMessage msg = ssn.createTextMessage(); + msg.setText("This is a test message"); + msg.setBooleanProperty("targetMessage", false); + prod.send(msg); + + msg.setBooleanProperty("targetMessage", true); + prod.send(msg); + + javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); + + if (m == null) + { + System.out.println("message is null"); + } + else + { + System.out.println("message is not null" + m); + }*/ + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java new file mode 100644 index 0000000000..2fdc74fc09 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java @@ -0,0 +1,63 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Header; + +/** + * Assembles message parts. + *

The sequence of event for transferring a message is as follows: + *

    + *
  • messageHeaders + *
  • n calls to addData + *
  • messageReceived + *
+ * It is up to the implementation to assemble the message once the different parts + * are transferred. + */ +public interface MessagePartListener +{ + /** + * Indicates the Message transfer has started. + * + * @param transferId The message transfer ID. + */ + public void messageTransfer(int transferId); + + /** + * Add the following a header to the message being received. + * + * @param header Either DeliveryProperties or ApplicationProperties + */ + public void messageHeader(Header header); + + /** + * Add the following byte array to the content of the message being received + * + * @param src Data to be added or streamed. + */ + public void data(ByteBuffer src); + + /** + * Indicates that the message has been fully received. + */ + public void messageReceived(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java new file mode 100644 index 0000000000..e4daaa094e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.qpid.transport.*; +import org.apache.qpid.api.Message; + +/** + *

A session is associated with a connection. + * When it is created, a session is not associated with an underlying channel. + * The session is single threaded.

+ *

+ * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. + * For example, command1 will be synchronously invoked by using the following sequence: + *

    + *
  • session.command1() + *
  • session.sync() + *
+ */ +public interface Session +{ + public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1; + public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0; + public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0; + public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1; + public static final short MESSAGE_FLOW_MODE_CREDIT = 0; + public static final short MESSAGE_FLOW_MODE_WINDOW = 1; + public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; + public static final short MESSAGE_FLOW_UNIT_BYTE = 1; + public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; + public static final short MESSAGE_REJECT_CODE_GENERIC = 0; + public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; + public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; + public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; + + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + + /** + * Sync method will block the session until all outstanding commands + * are executed. + */ + public void sync(); + + public void close(); + + public void sessionDetach(byte[] name, Option ... options); + + public void sessionRequestTimeout(long expiry, Option ... options); + + public byte[] getName(); + + public void setAutoSync(boolean value); + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** + * Transfer a message to a specified exchange. + *

+ *

This transfer provides a complete message + * using a single method. The method is internally mapped to messageTransfer() and headers() followed + * by data() and endData(). + * This method should only be used by small messages.

+ * + * @param destination The exchange the message is being sent to. + * @param msg The Message to be sent. + * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation + * is not required. Once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred. + *

    + *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message + * is not considered transferred until the original + * transfer is complete. A complete transfer is signaled by execution.complete. + *
+ * @param acquireMode
    + *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message + * must be explicitly acquired. + *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is + * acquired when the transfer starts. + *
+ * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. + */ + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) + throws IOException; + + + /** + *

This transfer streams a complete message using a single method. + * It uses pull-semantics instead of doing a push.

+ *

Data is pulled from a Message object using read() + * and pushed using messageTransfer() and headers() followed by data() and endData(). + *
This method should only be used by large messages
+ * There are two convenience Message classes to do this. + *

    + *
  • {@link org.apache.qpid.nclient.util.FileMessage} + *
  • {@link org.apache.qpid.nclient.util.StreamingMessage} + *
+ * You can also implement a Message interface to wrap any + * data stream. + *

+ * + * @param destination The exchange the message is being sent to. + * @param msg The Message to be sent. + * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation + * is not required. Once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred. + *

    + *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message + * is not considered transferred until the original + * transfer is complete. A complete transfer is signaled by execution.complete. + *
+ * @param acquireMode
    + *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message + * must be explicitly acquired. + *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message + * is acquired when the transfer starts. + *
+ * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. + */ + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException; + + /** + * This command transfers a message between two peers. + * + * @param destination Specifies the destination to which the message is to be transferred. + * @param acceptMode Indicates whether message.accept, session.complete, + * or nothing at all is required to indicate successful transfer of the message. + * + * @param acquireMode Indicates whether or not the transferred message has been acquired. + */ + public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, + Option ... options); + + /** + * Make a set of headers to be sent together with a message + * + * @param headers headers to be added + * @see org.apache.qpid.transport.DeliveryProperties + * @see org.apache.qpid.transport.MessageProperties + * @return The added headers. + */ + public Header header(Struct... headers); + + /** + * Add a byte array to the content of the message being sent. + * + * @param data Data to be added. + */ + public void data(byte[] data); + + /** + * A Add a ByteBuffer to the content of the message being sent. + *

Note that only the data between the buffer's current position and the + * buffer limit is added. + * It is therefore recommended to flip the buffer before adding it to the message, + * + * @param buf Data to be added. + */ + public void data(ByteBuffer buf); + + /** + * Add a string to the content of the message being sent. + * + * @param str String to be added. + */ + public void data(String str); + + /** + * Signals the end of data for the message. + */ + public void endData(); + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Associate a message listener with a destination. + *

The destination is bound to a queue, and messages are filtered based + * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). + *

The valid options are: + *

    + *
  • {@link Option#EXCLUSIVE}:

    Requests exclusive subscription access, so that only this + * subscription can access the queue. + *

  • {@link Option#NONE}:

    This is an empty option, and has no effect. + *

+ * + * @param queue The queue that the receiver is receiving messages from. + * @param destination The destination, or delivery tag, for the subscriber. + * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation + * is not required. Once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred. + *

    + *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message + * is not considered transferred until the original + * transfer is complete. A complete transfer is signaled by execution.complete. + *
+ * @param acquireMode
    + *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must + * be explicitly acquired. + *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is + * acquired when the transfer starts. + *
+ * @param listener The listener for this destination. To transfer large messages + * use a {@link org.apache.qpid.nclient.MessagePartListener}. + * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} + * and {@link Option#NONE}. + * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies + * according to the provider's implementation. + */ + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map filter, Option... options); + + /** + * This method cancels a consumer. The server will not send any more messages to the specified destination. + * This does not affect already delivered messages. + * The client may receive a + * number of messages in between sending the cancel method and receiving + * notification that the cancellation has been completed. + * + * @param destination The destination to be cancelled. + */ + public void messageCancel(String destination, Option ... options); + + /** + * Associate a message listener with a destination. + *

Only one listener is permitted for each destination. When a new listener is created, + * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener + * has completed processing a message. + * + * @param destination The destination the listener is associated with. + * @param listener The new listener for this destination. + */ + public void setMessageListener(String destination, MessagePartListener listener); + + /** + * Sets the mode of flow control used for a given destination. + *

With credit based flow control, the broker continually maintains its current + * credit balance with the recipient. The credit balance consists of two values, a message + * count, and a byte count. Whenever message data is sent, both counts must be decremented. + * If either value reaches zero, the flow of message data must stop. Additional credit is + * received via the {@link Session#messageFlow} method. + *

Window based flow control is identical to credit based flow control, however message + * acknowledgment implicitly grants a single unit of message credit, and the size of the + * message in byte credits for each acknowledged message. + * + * @param destination The destination to set the flow mode on. + * @param mode

  • credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control + *
  • window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control
+ */ + public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); + + + /** + * This method controls the flow of message data to a given destination. It is used by the + * recipient of messages to dynamically match the incoming rate of message flow to its + * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" + * number of the specified unit to the available credit balance for the specified destination. + * A value of 0 indicates an infinite amount of credit. This disables any limit for + * the given unit until the credit balance is zeroed with {@link Session#messageStop} + * or {@link Session#messageFlush}. + * + * @param destination The destination to set the flow. + * @param unit Specifies the unit of credit balance. + *

+ * One of:

    + *
  • message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE}) + *
  • byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE}) + *
+ * @param value Number of credits, a value of 0 indicates an infinite amount of credit. + */ + public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); + + /** + * Forces the broker to exhaust its credit supply. + *

The credit on the broker will remain at zero once + * this method is completed. + * + * @param destination The destination on which the credit supply is to be exhausted. + */ + public void messageFlush(String destination, Option ... options); + + /** + * On receipt of this method, the brokers set credit to zero for a given + * destination. When confirmation of this method + * is issued credit is set to zero. No further messages will be sent until + * further credit is received. + * + * @param destination The destination on which to reset credit. + */ + public void messageStop(String destination, Option ... options); + + /** + * Acknowledge the receipt of a range of messages. + *

Messages must already be acquired, either by receiving them in + * pre-acquire mode or by explicitly acquiring them. + * + * @param ranges Range of messages to be acknowledged. + * @param accept pecify whether to send a message accept to the broker + */ + public void messageAcknowledge(RangeSet ranges, boolean accept); + + /** + * Reject a range of acquired messages. + *

The broker will deliver rejected messages to the + * alternate-exchange on the queue from which it came. If no alternate-exchange is + * defined for that queue the broker will discard the message. + * + * @param ranges Range of messages to be rejected. + * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or + * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but + * failed). + * @param text String describing the reason for a message transfer rejection. + */ + public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); + + /** + * As it is possible that the broker does not manage to reject some messages, after completion of + * {@link Session#messageReject} this method will return the ranges of rejected messages. + *

Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the + * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. + *

A recommended invocation sequence would be: + *

    + *
  • {@link Session#messageReject} + *
  • {@link Session#sync()} + *
  • {@link Session#getRejectedMessages()} + *
+ * + * @return The rejected message ranges + */ + public RangeSet getRejectedMessages(); + + /** + * Try to acquire ranges of messages hence releasing them form the queue. + * This means that once acknowledged, a message will not be delivered to any other receiver. + *

As those messages may have been consumed by another receivers hence, + * message acquisition can fail. + * The outcome of the acquisition is returned as an array of ranges of qcquired messages. + *

This method should only be called on non-acquired messages. + * + * @param ranges Ranges of messages to be acquired. + * @return Indicates the acquired messages + */ + public Future messageAcquire(RangeSet ranges, Option ... options); + + /** + * Give up responsibility for processing ranges of messages. + *

Released messages are re-enqueued. + * + * @param ranges Ranges of messages to be released. + * @param options Valid option is: {@link Option#SET_REDELIVERED}) + */ + public void messageRelease(RangeSet ranges, Option ... options); + + // ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + /** + * Selects the session for local transaction support. + */ + public void txSelect(Option ... options); + + /** + * Commit the receipt and delivery of all messages exchanged by this session's resources. + * + * @throws IllegalStateException If this session is not transacted, an exception will be thrown. + */ + public void txCommit(Option ... options) throws IllegalStateException; + + /** + * Roll back the receipt and delivery of all messages exchanged by this session's resources. + * + * @throws IllegalStateException If this session is not transacted, an exception will be thrown. + */ + public void txRollback(Option ... options) throws IllegalStateException; + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + + /** + * Declare a queue with the given queueName + *

Following are the valid options: + *

    + *
  • {@link Option#AUTO_DELETE}:

    If this field is set and the exclusive field is also set, + * then the queue is deleted when the connection closes. + * If this field is set and the exclusive field is not set the queue is deleted when all + * the consumers have finished using it. + *

  • {@link Option#DURABLE}:

    If set when creating a new queue, + * the queue will be marked as durable. Durable queues + * remain active when a server restarts. Non-durable queues (transient queues) are purged + * if/when a server restarts. Note that durable queues do not necessarily hold persistent + * messages, although it does not make sense to send persistent messages to a transient + * queue. + *

  • {@link Option#EXCLUSIVE}:

    Exclusive queues can only be used from one connection at a time. + * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the + * declaring connection closes. + *

  • {@link Option#PASSIVE}:

    If set, the server will not create the queue. + * This field allows the client to assert the presence of a queue without modifying the server state. + *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. + *

+ *

In the absence of a particular option, the defaul value is false for each option + * + * @param queueName The name of the delcared queue. + * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message + * may be rejected by a queue for the following reasons: + *

  1. The queue is deleted when it is not empty; + *
  2. Immediate delivery of a message is requested, but there are no consumers connected to + * the queue.
+ * @param arguments Used for backward compatibility + * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, + * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) + * @see Option + */ + public void queueDeclare(String queueName, String alternateExchange, Map arguments, + Option... options); + + /** + * Bind a queue with an exchange. + * + * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current + * queue for the session, which is the last declared queue. + * @param exchangeName The exchange name. + * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages + * depending on the exchange configuration. Not all exchanges use a routing key - refer to + * the specific exchange documentation. If the queue name is empty, the server uses the last + * queue declared on the session. If the routing key is also empty, the server uses this + * queue name for the routing key as well. If the queue name is provided but the routing key + * is empty, the server does the binding with that empty routing key. The meaning of empty + * routing keys depends on the exchange implementation. + * @param arguments Used for backward compatibility + */ + public void exchangeBind(String queueName, String exchangeName, String routingKey, Map arguments, + Option ... options); + + /** + * Unbind a queue from an exchange. + * + * @param queueName Specifies the name of the queue to unbind. + * @param exchangeName The name of the exchange to unbind from. + * @param routingKey Specifies the routing key of the binding to unbind. + */ + public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); + + /** + * This method removes all messages from a queue. It does not cancel consumers. Purged messages + * are deleted without any formal "undo" mechanism. + * + * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the + * current queue for the session, which is the last declared queue. + */ + public void queuePurge(String queueName, Option ... options); + + /** + * This method deletes a queue. When a queue is deleted any pending messages are sent to a + * dead-letter queue if this is defined in the server configuration, and all consumers on the + * queue are cancelled. + *

Following are the valid options: + *

    + *
  • {@link Option#IF_EMPTY}:

    If set, the server will only delete the queue if it has no messages. + *

  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the queue if it has no consumers. + * If the queue has consumers the server does does not delete it but raises a channel exception instead. + *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. + *

+ *

+ *

+ *

In the absence of a particular option, the defaul value is false for each option

+ * + * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the + * current queue for the session, which is the last declared queue. + * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} + * and {@link Option#NONE}) + * @see Option + */ + public void queueDelete(String queueName, Option... options); + + + /** + * This method is used to request information on a particular queue. + * + * @param queueName The name of the queue for which information is requested. + * @return Information on the specified queue. + */ + public Future queueQuery(String queueName, Option ... options); + + + /** + * This method is used to request information on a particular binding. + * + * @param exchange The exchange name. + * @param queue The queue name. + * @param routingKey The routing key + * @param arguments bacward compatibilties params. + * @return Information on the specified binding. + */ + public Future exchangeBound(String exchange, String queue, String routingKey, + Map arguments, Option ... options); + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + + /** + * This method creates an exchange. If the exchange already exists, + * the method verifies the class and checks the details are correct. + *

Valid options are: + *

    + *
  • {@link Option#AUTO_DELETE}:

    If set, the exchange is deleted when all queues have finished using it. + *

  • {@link Option#DURABLE}:

    If set, the exchange will + * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient + * exchanges) are purged when a server restarts. + *

  • {@link Option#PASSIVE}:

    If set, the server will not create the exchange. + * The client can use this to check whether an exchange exists without modifying the server state. + *

  • {@link Option#NONE}:

    This option is an empty option, and has no effect. + *

+ *

In the absence of a particular option, the defaul value is false for each option

+ * + * @param exchangeName The exchange name. + * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The + * exchange types define the functionality of the exchange - i.e. how messages are routed + * through it. It is not valid or meaningful to attempt to change the type of an existing + * exchange. Default exchange types are: direct, topic, headers and fanout. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, + * {@link Option#PASSIVE}, {@link Option#NONE}) + * @param arguments Used for backward compatibility + * @see Option + */ + public void exchangeDeclare(String exchangeName, String type, String alternateExchange, + Map arguments, Option... options); + + /** + * This method deletes an exchange. When an exchange is deleted all queue bindings on the + * exchange are cancelled. + *

Following are the valid options: + *

    + *
  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the exchange if it has no queue bindings. If the + * exchange has queue bindings the server does not delete it but raises a channel exception + * instead. + *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. + *

+ *

Note that if an option is not set, it will default to false. + * + * @param exchangeName The name of exchange to be deleted. + * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. + * @see Option + */ + public void exchangeDelete(String exchangeName, Option... options); + + + /** + * This method is used to request information about a particular exchange. + * + * @param exchangeName The name of the exchange about which information is requested. If not set, the method will + * return information about the default exchange. + * @return Information on the specified exchange. + */ + public Future exchangeQuery(String exchangeName, Option ... options); + + /** + * If the session receives a sessionClosed with an error code it + * informs the session's exceptionListener + * + * @param exceptionListner The exceptionListener + */ + public void setClosedListener(ClosedListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java new file mode 100644 index 0000000000..ffde5336f9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -0,0 +1,206 @@ +package org.apache.qpid.nclient.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.QpidException; +import org.apache.qpid.api.Message; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; + +import static org.apache.qpid.transport.Option.*; + +/** + * Implements a Qpid Sesion. + */ +public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession +{ + static + { + String max = "message_size_before_sync"; // KB's + try + { + MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_SYNC_DATA_LENGH = 200000000; + } + String flush = "message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH = 20000000; + } + } + + private static long MAX_NOT_SYNC_DATA_LENGH; + private static long MAX_NOT_FLUSH_DATA_LENGH; + + private Map _messageListeners = new ConcurrentHashMap(); + private ClosedListener _exceptionListner; + private RangeSet _rejectedMessages; + private long _currentDataSizeNotSynced; + private long _currentDataSizeNotFlushed; + + public ClientSession(byte[] name) + { + super(name); + } + + public void messageAcknowledge(RangeSet ranges, boolean accept) + { + for (Range range : ranges) + { + super.processed(range); + } + super.flushProcessed(accept ? BATCH : NONE); + if (accept) + { + messageAccept(ranges); + } + } + + public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) + { + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode), + MessageAcquireMode.get(acquireMode), null, 0, filter, + options); + } + + public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException + { + // The javadoc clearly says that this method is suitable for small messages + // therefore reading the content in one shot. + ByteBuffer data = msg.readData(); + super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), + MessageAcquireMode.get(acquireMode)); + // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() ); + if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() ) + { + msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) ); + msg.getDeliveryProperties().setDirty(false); + msg.getMessageProperties().setDirty(false); + } + else + { + super.header(msg.getHeader()); + } + data( data ); + endData(); + } + + public void sync() + { + super.sync(); + _currentDataSizeNotSynced = 0; + } + + /* ------------------------- + * Data methods + * ------------------------*/ + + public void data(ByteBuffer buf) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining(); + _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining(); + super.data(buf); + } + + public void data(String str) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length; + super.data(str); + } + + public void data(byte[] bytes) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length; + super.data(bytes); + } + + public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException + { + super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), + MessageAcquireMode.get(acquireMode)); + super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); + boolean b = true; + int count = 0; + while(b) + { + try + { + System.out.println("count : " + count++); + data(msg.readData()); + } + catch(EOFException e) + { + b = false; + } + } + endData(); + } + + public void endData() + { + super.endData(); + /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) + { + sync(); + } + if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH) + { + executionFlush(); + _currentDataSizeNotFlushed = 0; + }*/ + } + + public RangeSet getRejectedMessages() + { + return _rejectedMessages; + } + + public void setMessageListener(String destination, MessagePartListener listener) + { + if (listener == null) + { + throw new IllegalArgumentException("Cannot set message listener to null"); + } + _messageListeners.put(destination, listener); + } + + public void setClosedListener(ClosedListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + + void setRejectedMessages(RangeSet rejectedMessages) + { + _rejectedMessages = rejectedMessages; + } + + void notifyException(QpidException ex) + { + _exceptionListner.onClosed(null, null, null); + } + + Map getMessageListeners() + { + return _messageListeners; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java new file mode 100644 index 0000000000..b57fd0a7ed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java @@ -0,0 +1,75 @@ +package org.apache.qpid.nclient.impl; + +import java.nio.ByteBuffer; + +import org.apache.qpid.ErrorCode; + +import org.apache.qpid.nclient.MessagePartListener; + +import org.apache.qpid.QpidException; +import org.apache.qpid.transport.Data; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageReject; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.SessionDelegate; + + +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void sessionDetached(Session ssn, SessionDetached dtc) + { + ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null)); + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + @Override public void data(Session ssn, Data data) + { + _currentMessageListener.data(data.getData()); + if (data.isLast()) + { + _currentMessageListener.messageReceived(); + } + } + + @Override public void header(Session ssn, Header header) + { + _currentMessageListener.messageHeader(header); + if( header.hasNoPayload()) + { + _currentMessageListener.data(ByteBuffer.allocate(0)); + _currentMessageListener.messageReceived(); + } + } + + + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination()); + _currentMessageListener.messageTransfer(currentTransfer.getId()); + } + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); + session.processed(struct); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/Constants.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/Constants.java new file mode 100644 index 0000000000..f689e9abde --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/Constants.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.impl; + +/** + * This class holds all the 0.10 client constants which value can be set + * through properties. + */ +public class Constants +{ + static + { + + String max="message_size_before_sync";// KB's + try + { + MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_SYNC_DATA_LENGH=200000000; + } + String flush="message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH=20000000; + } + } + + /** + * The total message size in KBs that can be transferted before + * client and broker are synchronized. + * A sync will result in the client library releasing the sent messages + * from memory. (messages are kept + * in memory so client can reconnect to a broker in the event of a failure) + *

+ * Property name: message_size_before_sync + *

+ * Default value: 200000000 + */ + public static long MAX_NOT_SYNC_DATA_LENGH; + /** + * The total message size in KBs that can be transferted before + * messages are flushed. + * When a flush returns all messages have reached the broker. + *

+ * Property name: message_size_before_flush + *

+ * Default value: 200000000 + */ + public static long MAX_NOT_FLUSH_DATA_LENGH; + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java new file mode 100644 index 0000000000..96e1d2c772 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java @@ -0,0 +1,94 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.ErrorCode; +import org.apache.qpid.api.Message; +import org.apache.qpid.nclient.Client; +import org.apache.qpid.nclient.Connection; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.nclient.Session; +import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageProperties; + +import java.util.UUID; + +public class DemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setClosedListener(new ClosedListener() + { + public void onClosed(ErrorCode errorCode, String reason, Throwable t) + { + System.out.println("ErrorCode : " + errorCode + " reason : " + reason); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.exchangeBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + // queue + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); + ssn.header(new DeliveryProperties().setRoutingKey("queue1"), + new MessageProperties().setMessageId(UUID.randomUUID())); + ssn.data("this is the data"); + ssn.endData(); + + //reject + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); + ssn.data("this should be rejected"); + ssn.header(new DeliveryProperties().setRoutingKey("stocks")); + ssn.endData(); + ssn.sync(); + + // topic subs + ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.sync(); + + ssn.queueDeclare("topic1", null, null); + ssn.exchangeBind("topic1", "amq.topic", "stock.*",null); + ssn.queueDeclare("topic2", null, null); + ssn.exchangeBind("topic2", "amq.topic", "stock.us.*",null); + ssn.queueDeclare("topic3", null, null); + ssn.exchangeBind("topic3", "amq.topic", "stock.us.rh",null); + ssn.sync(); + + // topic + ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); + ssn.data("Topic message"); + ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"), + new MessageProperties().setMessageId(UUID.randomUUID())); + ssn.endData(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java new file mode 100644 index 0000000000..36c0a4b3be --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java @@ -0,0 +1,76 @@ +package org.apache.qpid.nclient.impl; + +import java.io.FileInputStream; + +import org.apache.qpid.ErrorCode; +import org.apache.qpid.api.Message; +import org.apache.qpid.nclient.Client; +import org.apache.qpid.nclient.Connection; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.nclient.Session; +import org.apache.qpid.nclient.util.FileMessage; +import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; + +import java.util.UUID; + +public class LargeMsgDemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setClosedListener(new ClosedListener() + { + public void onClosed(ErrorCode errorCode, String reason, Throwable t) + { + System.out.println("ErrorCode : " + errorCode + " reason : " + reason); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.exchangeBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + try + { + FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"), + 1024, + new DeliveryProperties().setRoutingKey("queue1"), + new MessageProperties().setMessageId(UUID.randomUUID())); + + // queue + ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); + ssn.sync(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java new file mode 100644 index 0000000000..513c1a95de --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java @@ -0,0 +1,156 @@ +package org.apache.qpid.nclient.interop; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.api.Message; +import org.apache.qpid.nclient.Client; +import org.apache.qpid.nclient.Connection; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.nclient.Session; +import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.RangeSet; + +public class BasicInteropTest implements ClosedListener +{ + + private Session session; + private Connection conn; + private String host; + + public BasicInteropTest(String host) + { + this.host = host; + } + + public void close() throws QpidException + { + conn.close(); + } + + public void testCreateConnection(){ + System.out.println("------- Creating connection--------"); + conn = Client.createConnection(); + try{ + conn.connect(host, 5672, "test", "guest", "guest"); + }catch(Exception e){ + System.out.println("------- Error Creating connection--------"); + e.printStackTrace(); + System.exit(1); + } + System.out.println("------- Connection created Suscessfully --------"); + } + + public void testCreateSession(){ + System.out.println("------- Creating session --------"); + session = conn.createSession(0); + System.out.println("------- Session created sucessfully --------"); + } + + public void testExchange(){ + System.out.println("------- Creating an exchange --------"); + session.exchangeDeclare("test", "direct", "", null); + session.sync(); + System.out.println("------- Exchange created --------"); + } + + public void testQueue(){ + System.out.println("------- Creating a queue --------"); + session.queueDeclare("testQueue", "", null); + session.sync(); + System.out.println("------- Queue created --------"); + + System.out.println("------- Binding a queue --------"); + session.exchangeBind("testQueue", "test", "testKey", null); + session.sync(); + System.out.println("------- Queue bound --------"); + } + + public void testSendMessage(){ + System.out.println("------- Sending a message --------"); + session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); + + Map props = new HashMap(); + props.put("name", "rajith"); + props.put("age", 10); + props.put("spf", 8.5); + session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props)); + + //session.header(new DeliveryProperties().setRoutingKey("testKey")); + + session.data("TestMessage"); + session.endData(); + session.sync(); + System.out.println("------- Message sent --------"); + } + + public void testSubscribe() + { + System.out.println("------- Sending a subscribe --------"); + session.messageSubscribe("testQueue", "myDest", + Session.TRANSFER_CONFIRM_MODE_REQUIRED, + Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + new MessagePartListenerAdapter(new MessageListener(){ + + public void onMessage(Message message) + { + System.out.println("--------Message Received--------"); + System.out.println(message.toString()); + System.out.println("--------/Message Received--------"); + RangeSet ack = new RangeSet(); + ack.add(message.getMessageTransferId(),message.getMessageTransferId()); + session.messageAcknowledge(ack, true); + } + + }), + null); + + System.out.println("------- Setting Credit mode --------"); + session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); + System.out.println("------- Setting Credit --------"); + session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1); + session.messageFlow("myDest", MessageCreditUnit.BYTE, -1); + } + + public void testMessageFlush() + { + session.messageFlush("myDest"); + session.sync(); + } + + public void onClosed(ErrorCode errorCode, String reason, Throwable t) + { + System.out.println("------- Broker Notified an error --------"); + System.out.println("------- " + errorCode + " --------"); + System.out.println("------- " + reason + " --------"); + System.out.println("------- /Broker Notified an error --------"); + } + + public static void main(String[] args) throws QpidException + { + String host = "0.0.0.0"; + if (args.length>0) + { + host = args[0]; + } + + BasicInteropTest t = new BasicInteropTest(host); + t.testCreateConnection(); + t.testCreateSession(); + t.testExchange(); + t.testQueue(); + t.testSubscribe(); + t.testSendMessage(); + t.testMessageFlush(); + t.close(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java new file mode 100644 index 0000000000..8973127105 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java @@ -0,0 +1,160 @@ +package org.apache.qpid.nclient.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.api.Message; + +/** + *

A Simple implementation of the message interface + * for small messages. When the readData methods are called + * we assume the message is complete. i.e there want be any + * appendData operations after that.

+ * + *

If you need large message support please see + * FileMessage and StreamingMessage + *

+ */ +public class ByteBufferMessage implements Message +{ + private Queue _data = new LinkedList(); + private ByteBuffer _readBuffer; + private int _dataSize; + private DeliveryProperties _currentDeliveryProps; + private MessageProperties _currentMessageProps; + private int _transferId; + private Header _header; + + public void setHeader(Header header) { + _header = header; + } + + public Header getHeader() { + return _header; + } + + public ByteBufferMessage() + { + _currentDeliveryProps = new DeliveryProperties(); + _currentMessageProps = new MessageProperties(); + } + + public ByteBufferMessage(int transferId) + { + _transferId = transferId; + } + + public int getMessageTransferId() + { + return _transferId; + } + + public void clearData() + { + _data = new LinkedList(); + _readBuffer = null; + } + + public void appendData(byte[] src) throws IOException + { + appendData(ByteBuffer.wrap(src)); + } + + /** + * write the data from the current position up to the buffer limit + */ + public void appendData(ByteBuffer src) throws IOException + { + _data.offer(src); + _dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } + + public void setDeliveryProperties(DeliveryProperties props) + { + _currentDeliveryProps = props; + } + + public void setMessageProperties(MessageProperties props) + { + _currentMessageProps = props; + } + + public void readData(byte[] target) throws IOException + { + getReadBuffer().get(target); + } + + public ByteBuffer readData() throws IOException + { + return getReadBuffer(); + } + + private void buildReadBuffer() + { + //optimize for the simple cases + if(_data.size() == 1) + { + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(_dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + _readBuffer.flip(); + } + } + + private ByteBuffer getReadBuffer() throws IOException + { + if (_readBuffer != null ) + { + return _readBuffer.slice(); + } + else + { + if (_data.size() >0) + { + buildReadBuffer(); + return _readBuffer.slice(); + } + else + { + throw new IOException("No Data to read"); + } + } + } + + //hack for testing + @Override public String toString() + { + try + { + ByteBuffer temp = getReadBuffer(); + byte[] b = new byte[temp.remaining()]; + temp.get(b); + return new String(b); + } + catch(IOException e) + { + return "No data"; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/FileMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/FileMessage.java new file mode 100644 index 0000000000..179c91c2e9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/FileMessage.java @@ -0,0 +1,96 @@ +package org.apache.qpid.nclient.util; + +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.api.Message; + +/** + * FileMessage provides pull style semantics for + * larges messages backed by a disk. + * Instead of loading all data into memeory it uses + * FileChannel to map regions of the file into memeory + * at a time. + * + * The write methods are not supported. + * + * From the standpoint of performance it is generally + * only worth mapping relatively large files into memory. + * + * FileMessage msg = new FileMessage(in,delProps,msgProps); + * session.messageTransfer(dest,msg,0,0); + * + * The messageTransfer method will read the file in chunks + * and stream it. + * + */ +public class FileMessage extends ReadOnlyMessage implements Message +{ + private FileChannel _fileChannel; + private int _chunkSize; + private long _fileSize; + private long _pos = 0; + + public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _fileChannel = in.getChannel(); + _chunkSize = chunkSize; + _fileSize = _fileChannel.size(); + + if (_fileSize <= _chunkSize) + { + _chunkSize = (int)_fileSize; + } + } + + public void setHeader(Header header) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Header getHeader() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void readData(byte[] target) throws IOException + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer readData() throws IOException + { + if (_pos == _fileSize) + { + throw new EOFException(); + } + + if (_pos + _chunkSize > _fileSize) + { + _chunkSize = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + _pos += _chunkSize; + return bb; + } + + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public int getMessageTransferId() + { + throw new UnsupportedOperationException(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessageListener.java new file mode 100644 index 0000000000..c5edd62143 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessageListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.util; + +import org.apache.qpid.api.Message; + +/** + *A message listener + */ +public interface MessageListener +{ + /** + * Process an incoming message. + * + * @param message The incoming message. + */ + public void onMessage(Message message); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..3f1746f48a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -0,0 +1,59 @@ +package org.apache.qpid.nclient.util; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.nclient.MessagePartListener; + +/** + * This is a simple message assembler. + * Will call onMessage method of the adaptee + * when all message data is read. + * + * This is a good convinience utility for handling + * small messages + */ +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + ByteBufferMessage _currentMsg; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + } + + public void messageTransfer(int transferId) + { + _currentMsg = new ByteBufferMessage(transferId); + } + + public void data(ByteBuffer src) + { + try + { + _currentMsg.appendData(src); + } + catch(IOException e) + { + // A chance for IO exception + // doesn't occur as we are using + // a ByteBuffer + } + } + + public void messageHeader(Header header) + { + _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class)); + _currentMsg.setMessageProperties(header.get(MessageProperties.class)); + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ReadOnlyMessage.java new file mode 100644 index 0000000000..6583a95c7e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ReadOnlyMessage.java @@ -0,0 +1,38 @@ +package org.apache.qpid.nclient.util; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.api.Message; + +public abstract class ReadOnlyMessage implements Message +{ + MessageProperties _messageProperties; + DeliveryProperties _deliveryProperties; + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + + public void clearData() + { + throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data"); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/StreamingMessage.java new file mode 100644 index 0000000000..a4574438ac --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/StreamingMessage.java @@ -0,0 +1,68 @@ +package org.apache.qpid.nclient.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.api.Message; + +public class StreamingMessage extends ReadOnlyMessage implements Message +{ + SocketChannel _socChannel; + private int _chunkSize; + private ByteBuffer _readBuf; + + public Header getHeader() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setHeader(Header header) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _socChannel = in; + _chunkSize = chunkSize; + _readBuf = ByteBuffer.allocate(_chunkSize); + } + + public void readData(byte[] target) throws IOException + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer readData() throws IOException + { + if(_socChannel.isConnected() && _socChannel.isOpen()) + { + _readBuf.clear(); + _socChannel.read(_readBuf); + } + else + { + throw new EOFException("The underlying socket/channel has closed"); + } + + return _readBuf.duplicate(); + } + + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public int getMessageTransferId() + { + throw new UnsupportedOperationException(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/njms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpid/njms/ExceptionHelper.java new file mode 100644 index 0000000000..ce790a3b24 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/njms/ExceptionHelper.java @@ -0,0 +1,60 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.njms; + +import org.apache.qpid.QpidException; + +import javax.jms.JMSException; +import javax.transaction.xa.XAException; + +/** + * Helper class for handling exceptions + */ +public class ExceptionHelper +{ + static public JMSException convertQpidExceptionToJMSException(Exception exception) + { + JMSException jmsException = null; + if (!(exception instanceof JMSException)) + { + if (exception instanceof QpidException) + { + jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); + } + else + { + jmsException = new JMSException(exception.getMessage()); + } + jmsException.setLinkedException(exception); + jmsException.initCause(exception); + } + else + { + jmsException = (JMSException) exception; + } + return jmsException; + } + + static public XAException convertQpidExceptionToXAException(QpidException exception) + { + String qpidErrorCode = String.valueOf(exception.getErrorCode()); + // todo map this error to an XA code + int xaCode = XAException.XAER_PROTO; + return new XAException(xaCode); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java deleted file mode 100644 index 279716598d..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java +++ /dev/null @@ -1,268 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -/** - * An expression which performs an operation on two expression values - */ -public abstract class ArithmeticExpression extends BinaryExpression -{ - - protected static final int INTEGER = 1; - protected static final int LONG = 2; - protected static final int DOUBLE = 3; - - - public ArithmeticExpression(Expression left, Expression right) - { - super(left, right); - } - - public static Expression createPlus(Expression left, Expression right) - { - return new ArithmeticExpression(left, right) - { - protected Object evaluate(Object lvalue, Object rvalue) - { - if (lvalue instanceof String) - { - String text = (String) lvalue; - return text + rvalue; - } - else if (lvalue instanceof Number) - { - return plus((Number) lvalue, asNumber(rvalue)); - } - - throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); - } - - public String getExpressionSymbol() - { - return "+"; - } - }; - } - - public static Expression createMinus(Expression left, Expression right) - { - return new ArithmeticExpression(left, right) - { - protected Object evaluate(Object lvalue, Object rvalue) - { - if (lvalue instanceof Number) - { - return minus((Number) lvalue, asNumber(rvalue)); - } - - throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); - } - - public String getExpressionSymbol() - { - return "-"; - } - }; - } - - public static Expression createMultiply(Expression left, Expression right) - { - return new ArithmeticExpression(left, right) - { - - protected Object evaluate(Object lvalue, Object rvalue) - { - if (lvalue instanceof Number) - { - return multiply((Number) lvalue, asNumber(rvalue)); - } - - throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); - } - - public String getExpressionSymbol() - { - return "*"; - } - }; - } - - public static Expression createDivide(Expression left, Expression right) - { - return new ArithmeticExpression(left, right) - { - - protected Object evaluate(Object lvalue, Object rvalue) - { - if (lvalue instanceof Number) - { - return divide((Number) lvalue, asNumber(rvalue)); - } - - throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); - } - - public String getExpressionSymbol() - { - return "/"; - } - }; - } - - public static Expression createMod(Expression left, Expression right) - { - return new ArithmeticExpression(left, right) - { - - protected Object evaluate(Object lvalue, Object rvalue) - { - if (lvalue instanceof Number) - { - return mod((Number) lvalue, asNumber(rvalue)); - } - - throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); - } - - public String getExpressionSymbol() - { - return "%"; - } - }; - } - - protected Number plus(Number left, Number right) - { - switch (numberType(left, right)) - { - - case ArithmeticExpression.INTEGER: - return new Integer(left.intValue() + right.intValue()); - - case ArithmeticExpression.LONG: - return new Long(left.longValue() + right.longValue()); - - default: - return new Double(left.doubleValue() + right.doubleValue()); - } - } - - protected Number minus(Number left, Number right) - { - switch (numberType(left, right)) - { - - case ArithmeticExpression.INTEGER: - return new Integer(left.intValue() - right.intValue()); - - case ArithmeticExpression.LONG: - return new Long(left.longValue() - right.longValue()); - - default: - return new Double(left.doubleValue() - right.doubleValue()); - } - } - - protected Number multiply(Number left, Number right) - { - switch (numberType(left, right)) - { - - case ArithmeticExpression.INTEGER: - return new Integer(left.intValue() * right.intValue()); - - case ArithmeticExpression.LONG: - return new Long(left.longValue() * right.longValue()); - - default: - return new Double(left.doubleValue() * right.doubleValue()); - } - } - - protected Number divide(Number left, Number right) - { - return new Double(left.doubleValue() / right.doubleValue()); - } - - protected Number mod(Number left, Number right) - { - return new Double(left.doubleValue() % right.doubleValue()); - } - - private int numberType(Number left, Number right) - { - if (isDouble(left) || isDouble(right)) - { - return ArithmeticExpression.DOUBLE; - } - else if ((left instanceof Long) || (right instanceof Long)) - { - return ArithmeticExpression.LONG; - } - else - { - return ArithmeticExpression.INTEGER; - } - } - - private boolean isDouble(Number n) - { - return (n instanceof Float) || (n instanceof Double); - } - - protected Number asNumber(Object value) - { - if (value instanceof Number) - { - return (Number) value; - } - else - { - throw new RuntimeException("Cannot convert value: " + value + " into a number"); - } - } - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Object lvalue = left.evaluate(message); - if (lvalue == null) - { - return null; - } - - Object rvalue = right.evaluate(message); - if (rvalue == null) - { - return null; - } - - return evaluate(lvalue, rvalue); - } - - /** - * @param lvalue - * @param rvalue - * @return - */ - protected abstract Object evaluate(Object lvalue, Object rvalue); - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/BinaryExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/BinaryExpression.java deleted file mode 100644 index 465b504ae3..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/BinaryExpression.java +++ /dev/null @@ -1,103 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -/** - * An expression which performs an operation on two expression values. - */ -public abstract class BinaryExpression implements Expression -{ - protected Expression left; - protected Expression right; - - public BinaryExpression(Expression left, Expression right) - { - this.left = left; - this.right = right; - } - - public Expression getLeft() - { - return left; - } - - public Expression getRight() - { - return right; - } - - /** - * @see Object#toString() - */ - public String toString() - { - return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#hashCode() - */ - public int hashCode() - { - return toString().hashCode(); - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#equals(Object) - */ - public boolean equals(Object o) - { - - if ((o == null) || !this.getClass().equals(o.getClass())) - { - return false; - } - - return toString().equals(o.toString()); - - } - - /** - * Returns the symbol that represents this binary expression. For example, addition is - * represented by "+" - * - * @return - */ - public abstract String getExpressionSymbol(); - - /** - * @param expression - */ - public void setRight(Expression expression) - { - right = expression; - } - - /** - * @param expression - */ - public void setLeft(Expression expression) - { - left = expression; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/BooleanExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/BooleanExpression.java deleted file mode 100644 index 9a3b1c3106..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/BooleanExpression.java +++ /dev/null @@ -1,33 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -/** - * A BooleanExpression is an expression that always - * produces a Boolean result. - */ -public interface BooleanExpression extends Expression -{ - - public boolean matches(AbstractJMSMessage message) throws QpidException; - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java deleted file mode 100644 index 46f387b293..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java +++ /dev/null @@ -1,589 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - -import java.util.HashSet; -import java.util.List; -import java.util.regex.Pattern; - -/** - * A filter performing a comparison of two objects - */ -public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression -{ - - public static BooleanExpression createBetween(Expression value, Expression left, Expression right) - { - return LogicExpression.createAND(ComparisonExpression.createGreaterThanEqual(value, left), ComparisonExpression.createLessThanEqual(value, right)); - } - - public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) - { - return LogicExpression.createOR(ComparisonExpression.createLessThan(value, left), ComparisonExpression.createGreaterThan(value, right)); - } - - private static final HashSet REGEXP_CONTROL_CHARS = new HashSet(); - - static - { - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('.')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('\\')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('[')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character(']')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('^')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('$')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('?')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('*')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('+')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('{')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('}')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('|')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('(')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character(')')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character(':')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('&')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('<')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('>')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('=')); - ComparisonExpression.REGEXP_CONTROL_CHARS.add(new Character('!')); - } - - static class LikeExpression extends UnaryExpression implements BooleanExpression - { - - Pattern likePattern; - - /** - * @param right - */ - public LikeExpression(Expression right, String like, int escape) - { - super(right); - - StringBuffer regexp = new StringBuffer(like.length() * 2); - regexp.append("\\A"); // The beginning of the input - for (int i = 0; i < like.length(); i++) - { - char c = like.charAt(i); - if (escape == (0xFFFF & c)) - { - i++; - if (i >= like.length()) - { - // nothing left to escape... - break; - } - - char t = like.charAt(i); - regexp.append("\\x"); - regexp.append(Integer.toHexString(0xFFFF & t)); - } - else if (c == '%') - { - regexp.append(".*?"); // Do a non-greedy match - } - else if (c == '_') - { - regexp.append("."); // match one - } - else if (ComparisonExpression.REGEXP_CONTROL_CHARS.contains(new Character(c))) - { - regexp.append("\\x"); - regexp.append(Integer.toHexString(0xFFFF & c)); - } - else - { - regexp.append(c); - } - } - - regexp.append("\\z"); // The end of the input - - likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); - } - - /** - * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() - */ - public String getExpressionSymbol() - { - return "LIKE"; - } - - /** - * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) - */ - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - - Object rv = this.getRight().evaluate(message); - - if (rv == null) - { - return null; - } - - if (!(rv instanceof String)) - { - return - Boolean.FALSE; - // throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); - } - - return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; - } - - public boolean matches(AbstractJMSMessage message) throws QpidException - { - Object object = evaluate(message); - - return (object != null) && (object == Boolean.TRUE); - } - } - - public static BooleanExpression createLike(Expression left, String right, String escape) - { - if ((escape != null) && (escape.length() != 1)) - { - throw new RuntimeException( - "The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); - } - - int c = -1; - if (escape != null) - { - c = 0xFFFF & escape.charAt(0); - } - - return new LikeExpression(left, right, c); - } - - public static BooleanExpression createNotLike(Expression left, String right, String escape) - { - return UnaryExpression.createNOT(ComparisonExpression.createLike(left, right, escape)); - } - - public static BooleanExpression createInFilter(Expression left, List elements) - { - - if (!(left instanceof PropertyExpression)) - { - throw new RuntimeException("Expected a property for In expression, got: " + left); - } - - return UnaryExpression.createInExpression((PropertyExpression) left, elements, false); - - } - - public static BooleanExpression createNotInFilter(Expression left, List elements) - { - - if (!(left instanceof PropertyExpression)) - { - throw new RuntimeException("Expected a property for In expression, got: " + left); - } - - return UnaryExpression.createInExpression((PropertyExpression) left, elements, true); - - } - - public static BooleanExpression createIsNull(Expression left) - { - return ComparisonExpression.doCreateEqual(left, ConstantExpression.NULL); - } - - public static BooleanExpression createIsNotNull(Expression left) - { - return UnaryExpression.createNOT(ComparisonExpression.doCreateEqual(left, ConstantExpression.NULL)); - } - - public static BooleanExpression createNotEqual(Expression left, Expression right) - { - return UnaryExpression.createNOT(ComparisonExpression.createEqual(left, right)); - } - - public static BooleanExpression createEqual(Expression left, Expression right) - { - ComparisonExpression.checkEqualOperand(left); - ComparisonExpression.checkEqualOperand(right); - ComparisonExpression.checkEqualOperandCompatability(left, right); - - return ComparisonExpression.doCreateEqual(left, right); - } - - private static BooleanExpression doCreateEqual(Expression left, Expression right) - { - return new ComparisonExpression(left, right) - { - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Object lv = left.evaluate(message); - Object rv = right.evaluate(message); - - // Iff one of the values is null - if ((lv == null) ^ (rv == null)) - { - return Boolean.FALSE; - } - - if ((lv == rv) || lv.equals(rv)) - { - return Boolean.TRUE; - } - - if ((lv instanceof Comparable) && (rv instanceof Comparable)) - { - return compare((Comparable) lv, (Comparable) rv); - } - - return Boolean.FALSE; - } - - protected boolean asBoolean(int answer) - { - return answer == 0; - } - - public String getExpressionSymbol() - { - return "="; - } - }; - } - - public static BooleanExpression createGreaterThan(final Expression left, final Expression right) - { - ComparisonExpression.checkLessThanOperand(left); - ComparisonExpression.checkLessThanOperand(right); - - return new ComparisonExpression(left, right) - { - protected boolean asBoolean(int answer) - { - return answer > 0; - } - - public String getExpressionSymbol() - { - return ">"; - } - }; - } - - public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) - { - ComparisonExpression.checkLessThanOperand(left); - ComparisonExpression.checkLessThanOperand(right); - - return new ComparisonExpression(left, right) - { - protected boolean asBoolean(int answer) - { - return answer >= 0; - } - - public String getExpressionSymbol() - { - return ">="; - } - }; - } - - public static BooleanExpression createLessThan(final Expression left, final Expression right) - { - ComparisonExpression.checkLessThanOperand(left); - ComparisonExpression.checkLessThanOperand(right); - - return new ComparisonExpression(left, right) - { - - protected boolean asBoolean(int answer) - { - return answer < 0; - } - - public String getExpressionSymbol() - { - return "<"; - } - - }; - } - - public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) - { - ComparisonExpression.checkLessThanOperand(left); - ComparisonExpression.checkLessThanOperand(right); - - return new ComparisonExpression(left, right) - { - - protected boolean asBoolean(int answer) - { - return answer <= 0; - } - - public String getExpressionSymbol() - { - return "<="; - } - }; - } - - /** - * Only Numeric expressions can be used in >, >=, < or <= expressions.s - * - * @param expr - */ - public static void checkLessThanOperand(Expression expr) - { - if (expr instanceof ConstantExpression) - { - Object value = ((ConstantExpression) expr).getValue(); - if (value instanceof Number) - { - return; - } - - // Else it's boolean or a String.. - throw new RuntimeException("Value '" + expr + "' cannot be compared."); - } - - if (expr instanceof BooleanExpression) - { - throw new RuntimeException("Value '" + expr + "' cannot be compared."); - } - } - - /** - * Validates that the expression can be used in == or <> expression. - * Cannot not be NULL TRUE or FALSE litterals. - * - * @param expr - */ - public static void checkEqualOperand(Expression expr) - { - if (expr instanceof ConstantExpression) - { - Object value = ((ConstantExpression) expr).getValue(); - if (value == null) - { - throw new RuntimeException("'" + expr + "' cannot be compared."); - } - } - } - - /** - * - * @param left - * @param right - */ - private static void checkEqualOperandCompatability(Expression left, Expression right) - { - if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression)) - { - if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression)) - { - throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'"); - } - } - } - - /** - * @param left - * @param right - */ - public ComparisonExpression(Expression left, Expression right) - { - super(left, right); - } - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Comparable lv = (Comparable) left.evaluate(message); - if (lv == null) - { - return null; - } - - Comparable rv = (Comparable) right.evaluate(message); - if (rv == null) - { - return null; - } - - return compare(lv, rv); - } - - protected Boolean compare(Comparable lv, Comparable rv) - { - Class lc = lv.getClass(); - Class rc = rv.getClass(); - // If the the objects are not of the same type, - // try to convert up to allow the comparison. - if (lc != rc) - { - if (lc == Byte.class) - { - if (rc == Short.class) - { - lv = new Short(((Number) lv).shortValue()); - } - else if (rc == Integer.class) - { - lv = new Integer(((Number) lv).intValue()); - } - else if (rc == Long.class) - { - lv = new Long(((Number) lv).longValue()); - } - else if (rc == Float.class) - { - lv = new Float(((Number) lv).floatValue()); - } - else if (rc == Double.class) - { - lv = new Double(((Number) lv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else if (lc == Short.class) - { - if (rc == Integer.class) - { - lv = new Integer(((Number) lv).intValue()); - } - else if (rc == Long.class) - { - lv = new Long(((Number) lv).longValue()); - } - else if (rc == Float.class) - { - lv = new Float(((Number) lv).floatValue()); - } - else if (rc == Double.class) - { - lv = new Double(((Number) lv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else if (lc == Integer.class) - { - if (rc == Long.class) - { - lv = new Long(((Number) lv).longValue()); - } - else if (rc == Float.class) - { - lv = new Float(((Number) lv).floatValue()); - } - else if (rc == Double.class) - { - lv = new Double(((Number) lv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else if (lc == Long.class) - { - if (rc == Integer.class) - { - rv = new Long(((Number) rv).longValue()); - } - else if (rc == Float.class) - { - lv = new Float(((Number) lv).floatValue()); - } - else if (rc == Double.class) - { - lv = new Double(((Number) lv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else if (lc == Float.class) - { - if (rc == Integer.class) - { - rv = new Float(((Number) rv).floatValue()); - } - else if (rc == Long.class) - { - rv = new Float(((Number) rv).floatValue()); - } - else if (rc == Double.class) - { - lv = new Double(((Number) lv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else if (lc == Double.class) - { - if (rc == Integer.class) - { - rv = new Double(((Number) rv).doubleValue()); - } - else if (rc == Long.class) - { - rv = new Double(((Number) rv).doubleValue()); - } - else if (rc == Float.class) - { - rv = new Float(((Number) rv).doubleValue()); - } - else - { - return Boolean.FALSE; - } - } - else - { - return Boolean.FALSE; - } - } - - return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; - } - - protected abstract boolean asBoolean(int answer); - - public boolean matches(AbstractJMSMessage message) throws QpidException - { - Object object = evaluate(message); - - return (object != null) && (object == Boolean.TRUE); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/ConstantExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/ConstantExpression.java deleted file mode 100644 index 26aeec2de8..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/ConstantExpression.java +++ /dev/null @@ -1,204 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - -import java.math.BigDecimal; - -/** - * Represents a constant expression - */ -public class ConstantExpression implements Expression -{ - - static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression - { - public BooleanConstantExpression(Object value) - { - super(value); - } - - public boolean matches(AbstractJMSMessage message) throws QpidException - { - Object object = evaluate(message); - - return (object != null) && (object == Boolean.TRUE); - } - } - - public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); - public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); - public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); - - private Object value; - - public static ConstantExpression createFromDecimal(String text) - { - - // Strip off the 'l' or 'L' if needed. - if (text.endsWith("l") || text.endsWith("L")) - { - text = text.substring(0, text.length() - 1); - } - - Number value; - try - { - value = new Long(text); - } - catch (NumberFormatException e) - { - // The number may be too big to fit in a long. - value = new BigDecimal(text); - } - - long l = value.longValue(); - if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) - { - value = new Integer(value.intValue()); - } - - return new ConstantExpression(value); - } - - public static ConstantExpression createFromHex(String text) - { - Number value = new Long(Long.parseLong(text.substring(2), 16)); - long l = value.longValue(); - if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) - { - value = new Integer(value.intValue()); - } - - return new ConstantExpression(value); - } - - public static ConstantExpression createFromOctal(String text) - { - Number value = new Long(Long.parseLong(text, 8)); - long l = value.longValue(); - if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) - { - value = new Integer(value.intValue()); - } - - return new ConstantExpression(value); - } - - public static ConstantExpression createFloat(String text) - { - Number value = new Double(text); - - return new ConstantExpression(value); - } - - public ConstantExpression(Object value) - { - this.value = value; - } - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - return value; - } - - public Object getValue() - { - return value; - } - - /** - * @see Object#toString() - */ - public String toString() - { - if (value == null) - { - return "NULL"; - } - - if (value instanceof Boolean) - { - return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; - } - - if (value instanceof String) - { - return ConstantExpression.encodeString((String) value); - } - - return value.toString(); - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#hashCode() - */ - public int hashCode() - { - return toString().hashCode(); - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#equals(Object) - */ - public boolean equals(Object o) - { - - if ((o == null) || !this.getClass().equals(o.getClass())) - { - return false; - } - - return toString().equals(o.toString()); - - } - - /** - * Encodes the value of string so that it looks like it would look like - * when it was provided in a selector. - * - * @param s - * @return - */ - public static String encodeString(String s) - { - StringBuffer b = new StringBuffer(); - b.append('\''); - for (int i = 0; i < s.length(); i++) - { - char c = s.charAt(i); - if (c == '\'') - { - b.append(c); - } - - b.append(c); - } - - b.append('\''); - - return b.toString(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/Expression.java b/java/client/src/main/java/org/apache/qpidity/filter/Expression.java deleted file mode 100644 index bdc3c9cccc..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/Expression.java +++ /dev/null @@ -1,34 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -/** - * Represents an expression - */ -public interface Expression -{ - /** - * @param message The message to evaluate - * @return the value of this expression - */ - public Object evaluate(AbstractJMSMessage message) throws QpidException; -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java deleted file mode 100644 index c73da1682a..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.filter.selector.SelectorParser; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -public class JMSSelectorFilter implements MessageFilter -{ - /** - * this JMSSelectorFilter's logger - */ - private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); - - private String _selector; - private BooleanExpression _matcher; - - public JMSSelectorFilter(String selector) throws QpidException - { - _selector = selector; - if (JMSSelectorFilter._logger.isDebugEnabled()) - { - JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector); - } - _matcher = new SelectorParser().parse(selector); - } - - public boolean matches(AbstractJMSMessage message) - { - try - { - boolean match = _matcher.matches(message); - if (JMSSelectorFilter._logger.isDebugEnabled()) - { - JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System - .identityHashCode(_selector) + "):" + _selector); - } - return match; - } - catch (QpidException e) - { - JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e); - } - return false; - } - - public String getSelector() - { - return _selector; - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/LogicExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/LogicExpression.java deleted file mode 100644 index 7f5909df43..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/LogicExpression.java +++ /dev/null @@ -1,108 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -/** - * A filter performing a comparison of two objects - */ -public abstract class LogicExpression extends BinaryExpression implements BooleanExpression -{ - - public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) - { - return new LogicExpression(lvalue, rvalue) - { - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - - Boolean lv = (Boolean) left.evaluate(message); - // Can we do an OR shortcut?? - if ((lv != null) && lv.booleanValue()) - { - return Boolean.TRUE; - } - - Boolean rv = (Boolean) right.evaluate(message); - - return (rv == null) ? null : rv; - } - - public String getExpressionSymbol() - { - return "OR"; - } - }; - } - - public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) - { - return new LogicExpression(lvalue, rvalue) - { - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - - Boolean lv = (Boolean) left.evaluate(message); - - // Can we do an AND shortcut?? - if (lv == null) - { - return null; - } - - if (!lv.booleanValue()) - { - return Boolean.FALSE; - } - - Boolean rv = (Boolean) right.evaluate(message); - - return (rv == null) ? null : rv; - } - - public String getExpressionSymbol() - { - return "AND"; - } - }; - } - - /** - * @param left - * @param right - */ - public LogicExpression(BooleanExpression left, BooleanExpression right) - { - super(left, right); - } - - public abstract Object evaluate(AbstractJMSMessage message) throws QpidException; - - public boolean matches(AbstractJMSMessage message) throws QpidException - { - Object object = evaluate(message); - - return (object != null) && (object == Boolean.TRUE); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/MessageFilter.java b/java/client/src/main/java/org/apache/qpidity/filter/MessageFilter.java deleted file mode 100644 index aa1303a373..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/MessageFilter.java +++ /dev/null @@ -1,27 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - - -public interface MessageFilter -{ - boolean matches(AbstractJMSMessage message) throws QpidException; -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/PropertyExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/PropertyExpression.java deleted file mode 100644 index 5ea2004d75..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/PropertyExpression.java +++ /dev/null @@ -1,303 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpidity.QpidException; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.jms.JMSException; -import java.util.HashMap; - -/** - * Represents a property expression - */ -public class PropertyExpression implements Expression -{ - // Constants - defined the same as JMS - private static final int NON_PERSISTENT = 1; - private static final int DEFAULT_PRIORITY = 4; - - private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); - - private static final HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); - - static - { - JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - //TODO - return null; - } - }); - JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString replyTo = _properties.getReplyTo(); - - return (replyTo == null) ? null : replyTo.toString(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property", e); - - return null; - } - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString type = _properties.getType(); - - return (type == null) ? null : type.toString(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property", e); - - return null; - } - - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - int mode = message.getJMSDeliveryMode(); - if (_logger.isDebugEnabled()) - { - _logger.debug("JMSDeliveryMode is :" + mode); - } - - return mode; - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - } - - return NON_PERSISTENT; - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return (int) _properties.getPriority(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - } - - return DEFAULT_PRIORITY; - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString messageId = _properties.getMessageId(); - - return (messageId == null) ? null : messageId; - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - - return null; - } - - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return _properties.getTimestamp(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - - return null; - } - - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString correlationId = _properties.getCorrelationId(); - return (correlationId == null) ? null : correlationId.toString(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - - return null; - } - - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return _properties.getExpiration(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property",e); - return null; - } - - } - }); - - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() - { - public Object evaluate(AbstractJMSMessage message) - { - try - { - return message.getJMSRedelivered(); - } - catch (JMSException e) - { - _logger.warn("Error evaluating property",e); - return null; - } - } - }); - - } - - private final String name; - private final Expression jmsPropertyExpression; - - public PropertyExpression(String name) - { - this.name = name; - jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); - } - - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - - if (jmsPropertyExpression != null) - { - return jmsPropertyExpression.evaluate(message); - } - else - { - - CommonContentHeaderProperties _properties = message.getContentHeaderProperties(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Looking up property:" + name); - _logger.debug("Properties are:" + _properties.getHeaders().keySet()); - } - return _properties.getHeaders().getObject(name); - } - } - - public String getName() - { - return name; - } - - /** - * @see java.lang.Object#toString() - */ - public String toString() - { - return name; - } - - /** - * @see java.lang.Object#hashCode() - */ - public int hashCode() - { - return name.hashCode(); - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - public boolean equals(Object o) - { - if ((o == null) || !this.getClass().equals(o.getClass())) - { - return false; - } - return name.equals(((PropertyExpression) o).name); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/filter/UnaryExpression.java b/java/client/src/main/java/org/apache/qpidity/filter/UnaryExpression.java deleted file mode 100644 index f73449679c..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/filter/UnaryExpression.java +++ /dev/null @@ -1,321 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.filter; - -import org.apache.qpidity.QpidException; -import org.apache.qpid.client.message.AbstractJMSMessage; - -import java.math.BigDecimal; -import java.util.List; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; - -/** - * An expression which performs an operation on two expression values - */ -public abstract class UnaryExpression implements Expression -{ - - private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); - protected Expression right; - - public static Expression createNegate(Expression left) - { - return new UnaryExpression(left) - { - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Object rvalue = right.evaluate(message); - if (rvalue == null) - { - return null; - } - - if (rvalue instanceof Number) - { - return UnaryExpression.negate((Number) rvalue); - } - - return null; - } - - public String getExpressionSymbol() - { - return "-"; - } - }; - } - - public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) - { - - // Use a HashSet if there are many elements. - Collection t; - if (elements.size() == 0) - { - t = null; - } - else if (elements.size() < 5) - { - t = elements; - } - else - { - t = new HashSet(elements); - } - - final Collection inList = t; - - return new BooleanUnaryExpression(right) - { - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - - Object rvalue = right.evaluate(message); - if (rvalue == null) - { - return null; - } - - if (rvalue.getClass() != String.class) - { - return null; - } - - if (((inList != null) && inList.contains(rvalue)) ^ not) - { - return Boolean.TRUE; - } - else - { - return Boolean.FALSE; - } - - } - - public String toString() - { - StringBuffer answer = new StringBuffer(); - answer.append(right); - answer.append(" "); - answer.append(getExpressionSymbol()); - answer.append(" ( "); - - int count = 0; - for (Iterator i = inList.iterator(); i.hasNext();) - { - Object o = (Object) i.next(); - if (count != 0) - { - answer.append(", "); - } - - answer.append(o); - count++; - } - - answer.append(" )"); - - return answer.toString(); - } - - public String getExpressionSymbol() - { - if (not) - { - return "NOT IN"; - } - else - { - return "IN"; - } - } - }; - } - - abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression - { - public BooleanUnaryExpression(Expression left) - { - super(left); - } - - public boolean matches(AbstractJMSMessage message) throws QpidException - { - Object object = evaluate(message); - - return (object != null) && (object == Boolean.TRUE); - } - } - - ; - - public static BooleanExpression createNOT(BooleanExpression left) - { - return new BooleanUnaryExpression(left) - { - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Boolean lvalue = (Boolean) right.evaluate(message); - if (lvalue == null) - { - return null; - } - - return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; - } - - public String getExpressionSymbol() - { - return "NOT"; - } - }; - } - public static BooleanExpression createBooleanCast(Expression left) - { - return new BooleanUnaryExpression(left) - { - public Object evaluate(AbstractJMSMessage message) throws QpidException - { - Object rvalue = right.evaluate(message); - if (rvalue == null) - { - return null; - } - - if (!rvalue.getClass().equals(Boolean.class)) - { - return Boolean.FALSE; - } - - return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; - } - - public String toString() - { - return right.toString(); - } - - public String getExpressionSymbol() - { - return ""; - } - }; - } - - private static Number negate(Number left) - { - Class clazz = left.getClass(); - if (clazz == Integer.class) - { - return new Integer(-left.intValue()); - } - else if (clazz == Long.class) - { - return new Long(-left.longValue()); - } - else if (clazz == Float.class) - { - return new Float(-left.floatValue()); - } - else if (clazz == Double.class) - { - return new Double(-left.doubleValue()); - } - else if (clazz == BigDecimal.class) - { - // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the - // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it - // as a Big decimal. But it gets Negated right away.. to here we try to covert it back - // to a Long. - BigDecimal bd = (BigDecimal) left; - bd = bd.negate(); - - if (UnaryExpression.BD_LONG_MIN_VALUE.compareTo(bd) == 0) - { - return new Long(Long.MIN_VALUE); - } - - return bd; - } - else - { - throw new RuntimeException("Don't know how to negate: " + left); - } - } - - public UnaryExpression(Expression left) - { - this.right = left; - } - - public Expression getRight() - { - return right; - } - - public void setRight(Expression expression) - { - right = expression; - } - - /** - * @see Object#toString() - */ - public String toString() - { - return "(" + getExpressionSymbol() + " " + right.toString() + ")"; - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#hashCode() - */ - public int hashCode() - { - return toString().hashCode(); - } - - /** - * TODO: more efficient hashCode() - * - * @see Object#equals(Object) - */ - public boolean equals(Object o) - { - - if ((o == null) || !this.getClass().equals(o.getClass())) - { - return false; - } - - return toString().equals(o.toString()); - - } - - /** - * Returns the symbol that represents this binary expression. For example, addition is - * represented by "+" - * - * @return - */ - public abstract String getExpressionSymbol(); - -} diff --git a/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java b/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java deleted file mode 100644 index c73d6e4b35..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java +++ /dev/null @@ -1,509 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.naming; - -import org.apache.qpid.jndi.NameParserImpl; - -import javax.naming.*; -import javax.naming.spi.NamingManager; -import java.io.Serializable; -import java.util.*; - -/** - * Based on class from ActiveMQ. - * A read-only Context - *

- * This version assumes it and all its subcontext are read-only and any attempt - * to modify (e.g. through bind) will result in an OperationNotSupportedException. - * Each Context in the tree builds a cache of the entries in all sub-contexts - * to optimise the performance of lookup. - *

- *

This implementation is intended to optimise the performance of lookup(String) - * to about the level of a HashMap get. It has been observed that the scheme - * resolution phase performed by the JVM takes considerably longer, so for - * optimum performance lookups should be coded like:

- * - * Context componentContext = (Context)new InitialContext().lookup("java:comp"); - * String envEntry = (String) componentContext.lookup("env/myEntry"); - * String envEntry2 = (String) componentContext.lookup("env/myEntry2"); - * - */ -public class ReadOnlyContext implements Context, Serializable -{ - private static final long serialVersionUID = -5754338187296859149L; - protected static final NameParser nameParser = new NameParserImpl(); - - protected final Hashtable environment; // environment for this context - protected final Map bindings; // bindings at my level - protected final Map treeBindings; // all bindings under me - - private boolean frozen = false; - private String nameInNamespace = ""; - public static final String SEPARATOR = "/"; - - public ReadOnlyContext() - { - environment = new Hashtable(); - bindings = new HashMap(); - treeBindings = new HashMap(); - } - - public ReadOnlyContext(Hashtable env) - { - if (env == null) - { - this.environment = new Hashtable(); - } - else - { - this.environment = new Hashtable(env); - } - - this.bindings = Collections.EMPTY_MAP; - this.treeBindings = Collections.EMPTY_MAP; - } - - public ReadOnlyContext(Hashtable environment, Map bindings) - { - if (environment == null) - { - this.environment = new Hashtable(); - } - else - { - this.environment = new Hashtable(environment); - } - - this.bindings = bindings; - treeBindings = new HashMap(); - frozen = true; - } - - public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace) - { - this(environment, bindings); - this.nameInNamespace = nameInNamespace; - } - - protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env) - { - this.bindings = clone.bindings; - this.treeBindings = clone.treeBindings; - this.environment = new Hashtable(env); - } - - protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace) - { - this(clone, env); - this.nameInNamespace = nameInNamespace; - } - - public void freeze() - { - frozen = true; - } - - boolean isFrozen() - { - return frozen; - } - - /** - * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses. - * It binds every possible lookup into a map in each context. To do this, each context - * strips off one name segment and if necessary creates a new context for it. Then it asks that context - * to bind the remaining name. It returns a map containing all the bindings from the next context, plus - * the context it just created (if it in fact created it). (the names are suitably extended by the segment - * originally lopped off). - * - * @param name - * @param value - * @return - * @throws javax.naming.NamingException - */ - protected Map internalBind(String name, Object value) throws NamingException - { - assert (name != null) && (name.length() > 0); - assert !frozen; - - Map newBindings = new HashMap(); - int pos = name.indexOf('/'); - if (pos == -1) - { - if (treeBindings.put(name, value) != null) - { - throw new NamingException("Something already bound at " + name); - } - - bindings.put(name, value); - newBindings.put(name, value); - } - else - { - String segment = name.substring(0, pos); - assert segment != null; - assert !segment.equals(""); - Object o = treeBindings.get(segment); - if (o == null) - { - o = newContext(); - treeBindings.put(segment, o); - bindings.put(segment, o); - newBindings.put(segment, o); - } - else if (!(o instanceof ReadOnlyContext)) - { - throw new NamingException("Something already bound where a subcontext should go"); - } - - ReadOnlyContext readOnlyContext = (ReadOnlyContext) o; - String remainder = name.substring(pos + 1); - Map subBindings = readOnlyContext.internalBind(remainder, value); - for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();) - { - Map.Entry entry = (Map.Entry) iterator.next(); - String subName = segment + "/" + (String) entry.getKey(); - Object bound = entry.getValue(); - treeBindings.put(subName, bound); - newBindings.put(subName, bound); - } - } - - return newBindings; - } - - protected ReadOnlyContext newContext() - { - return new ReadOnlyContext(); - } - - public Object addToEnvironment(String propName, Object propVal) throws NamingException - { - return environment.put(propName, propVal); - } - - public Hashtable getEnvironment() throws NamingException - { - return (Hashtable) environment.clone(); - } - - public Object removeFromEnvironment(String propName) throws NamingException - { - return environment.remove(propName); - } - - public Object lookup(String name) throws NamingException - { - if (name.length() == 0) - { - return this; - } - - Object result = treeBindings.get(name); - if (result == null) - { - result = bindings.get(name); - } - - if (result == null) - { - int pos = name.indexOf(':'); - if (pos > 0) - { - String scheme = name.substring(0, pos); - Context ctx = NamingManager.getURLContext(scheme, environment); - if (ctx == null) - { - throw new NamingException("scheme " + scheme + " not recognized"); - } - - return ctx.lookup(name); - } - else - { - // Split out the first name of the path - // and look for it in the bindings map. - CompositeName path = new CompositeName(name); - - if (path.size() == 0) - { - return this; - } - else - { - String first = path.get(0); - Object obj = bindings.get(first); - if (obj == null) - { - throw new NameNotFoundException(name); - } - else if ((obj instanceof Context) && (path.size() > 1)) - { - Context subContext = (Context) obj; - obj = subContext.lookup(path.getSuffix(1)); - } - - return obj; - } - } - } - - if (result instanceof LinkRef) - { - LinkRef ref = (LinkRef) result; - result = lookup(ref.getLinkName()); - } - - if (result instanceof Reference) - { - try - { - result = NamingManager.getObjectInstance(result, null, null, this.environment); - } - catch (NamingException e) - { - throw e; - } - catch (Exception e) - { - throw (NamingException) new NamingException("could not look up : " + name).initCause(e); - } - } - - if (result instanceof ReadOnlyContext) - { - String prefix = getNameInNamespace(); - if (prefix.length() > 0) - { - prefix = prefix + SEPARATOR; - } - - result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name); - } - - return result; - } - - public Object lookup(Name name) throws NamingException - { - return lookup(name.toString()); - } - - public Object lookupLink(String name) throws NamingException - { - return lookup(name); - } - - public Name composeName(Name name, Name prefix) throws NamingException - { - Name result = (Name) prefix.clone(); - result.addAll(name); - - return result; - } - - public String composeName(String name, String prefix) throws NamingException - { - CompositeName result = new CompositeName(prefix); - result.addAll(new CompositeName(name)); - - return result.toString(); - } - - public NamingEnumeration list(String name) throws NamingException - { - Object o = lookup(name); - if (o == this) - { - return new ReadOnlyContext.ListEnumeration(); - } - else if (o instanceof Context) - { - return ((Context) o).list(""); - } - else - { - throw new NotContextException(); - } - } - - public NamingEnumeration listBindings(String name) throws NamingException - { - Object o = lookup(name); - if (o == this) - { - return new ReadOnlyContext.ListBindingEnumeration(); - } - else if (o instanceof Context) - { - return ((Context) o).listBindings(""); - } - else - { - throw new NotContextException(); - } - } - - public Object lookupLink(Name name) throws NamingException - { - return lookupLink(name.toString()); - } - - public NamingEnumeration list(Name name) throws NamingException - { - return list(name.toString()); - } - - public NamingEnumeration listBindings(Name name) throws NamingException - { - return listBindings(name.toString()); - } - - public void bind(Name name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void bind(String name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void close() throws NamingException - { - // ignore - } - - public Context createSubcontext(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public Context createSubcontext(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void destroySubcontext(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void destroySubcontext(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public String getNameInNamespace() throws NamingException - { - return nameInNamespace; - } - - public NameParser getNameParser(Name name) throws NamingException - { - return nameParser; - } - - public NameParser getNameParser(String name) throws NamingException - { - return nameParser; - } - - public void rebind(Name name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rebind(String name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rename(Name oldName, Name newName) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rename(String oldName, String newName) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void unbind(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void unbind(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - private abstract class LocalNamingEnumeration implements NamingEnumeration - { - private Iterator i = bindings.entrySet().iterator(); - - public boolean hasMore() throws NamingException - { - return i.hasNext(); - } - - public boolean hasMoreElements() - { - return i.hasNext(); - } - - protected Map.Entry getNext() - { - return (Map.Entry) i.next(); - } - - public void close() throws NamingException - { } - } - - private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration - { - public Object next() throws NamingException - { - return nextElement(); - } - - public Object nextElement() - { - Map.Entry entry = getNext(); - - return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName()); - } - } - - private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration - { - public Object next() throws NamingException - { - return nextElement(); - } - - public Object nextElement() - { - Map.Entry entry = getNext(); - - return new Binding((String) entry.getKey(), entry.getValue()); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties b/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties deleted file mode 100644 index e451cf53fa..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -java.naming.factory.initial = org.apache.qpidity.naming.PropertiesFileInitialConextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -# qpid:username=foo;password=password;client_id=id;virtualhost=path@tpc:localhost:1556 -connectionfactory.local = qpid:tcp:localhost' - -# register some queues in JNDI using the form -# queue.[jndiName] = [physicalName] -queue.MyQueue = example.MyQueue - -# register some topics in JNDI using the form -# topic.[jndiName] = [physicalName] -topic.ibmStocks = stocks.nyse.ibm - -# Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers -# destination.[jniName] = [BindingURL] -destination.direct = direct://amq.direct//directQueue diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java deleted file mode 100644 index eb0e370560..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpidity.nclient; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpid.client.url.URLParser_0_10; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.url.QpidURL; -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.nclient.impl.ClientSession; -import org.apache.qpidity.nclient.impl.ClientSessionDelegate; -import org.apache.qpidity.transport.Channel; -import org.apache.qpidity.transport.ClientDelegate; -import org.apache.qpidity.transport.Connection; -import org.apache.qpidity.transport.ConnectionClose; -import org.apache.qpidity.transport.ConnectionCloseCode; -import org.apache.qpidity.transport.ConnectionCloseOk; -import org.apache.qpidity.transport.ProtocolHeader; -import org.apache.qpidity.transport.ProtocolVersionException; -import org.apache.qpidity.transport.SessionDelegate; -import org.apache.qpidity.transport.network.io.IoTransport; -import org.apache.qpidity.transport.network.mina.MinaHandler; -import org.apache.qpidity.transport.network.nio.NioHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class Client implements org.apache.qpidity.nclient.Connection -{ - private Connection _conn; - private ClosedListener _closedListner; - private final Lock _lock = new ReentrantLock(); - private static Logger _logger = LoggerFactory.getLogger(Client.class); - private Condition closeOk; - private boolean closed = false; - private long timeout = 60000; - - private ProtocolHeader header = null; - - /** - * - * @return returns a new connection to the broker. - */ - public static org.apache.qpidity.nclient.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - - final Condition negotiationComplete = _lock.newCondition(); - closeOk = _lock.newCondition(); - _lock.lock(); - - ClientDelegate connectionDelegate = new ClientDelegate() - { - private boolean receivedClose = false; - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - public void exception(Throwable t) - { - if (_closedListner != null) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); - } - else - { - throw new RuntimeException("connection closed",t); - } - } - - public void closed() - { - if (_closedListner != null && !this.receivedClose) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); - } - } - - @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) - { - _lock.lock(); - try - { - closed = true; - this.receivedClose = true; - closeOk.signalAll(); - } - finally - { - _lock.unlock(); - } - } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) - { - ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); - if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) - { - throw new RuntimeException - (new QpidException("Server closed the connection: Reason " + - connectionClose.getReplyText(), - errorCode, - null)); - } - else - { - _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); - } - - this.receivedClose = true; - } - @Override public void init(Channel ch, ProtocolHeader hdr) - { - // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and mino - if (hdr.getMajor() != 0 || hdr.getMinor() != 10) - { - Client.this.header = hdr; - _lock.lock(); - negotiationComplete.signalAll(); - _lock.unlock(); - } - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - String transport = System.getProperty("transport","io"); - if (transport.equalsIgnoreCase("nio")) - { - _logger.info("using NIO Transport"); - _conn = NioHandler.connect(host, port,connectionDelegate); - } - else if (transport.equalsIgnoreCase("io")) - { - _logger.info("using Plain IO Transport"); - _conn = IoTransport.connect(host, port,connectionDelegate); - } - else - { - _logger.info("using MINA Transport"); - _conn = MinaHandler.connect(host, port,connectionDelegate); - // _conn = NativeHandler.connect(host, port,connectionDelegate); - } - - // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, 0, 10)); - - try - { - negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if (header != null) - { - _conn.close(); - throw new ProtocolVersionException(header.getMajor(), header.getMinor()); - } - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - finally - { - _lock.unlock(); - } - } - - public void connect(String url)throws QpidException - { - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(url); - } - catch(Exception e) - { - throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); - } - List brokers = parser.getAllBrokerDetails(); - BrokerDetails brokerDetail = brokers.get(0); - connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), - brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), - brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(QpidURL url) throws QpidException - { - throw new UnsupportedOperationException("Not implemented"); - } - - /* { - // temp impl to tests - BrokerDetails details = url.getAllBrokerDetails().get(0); - connect(details.getHost(), - details.getPort(), - details.getVirtualHost(), - details.getUserName(), - details.getPassword()); - } -*/ - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); - _lock.lock(); - try - { - try - { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed && elapsed < timeout) - { - closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); - elapsed = System.currentTimeMillis() - start; - } - if(!closed) - { - throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - catch (InterruptedException e) - { - throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - finally - { - _lock.unlock(); - } - _conn.close(); - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(); - ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); - clientSession.dtxSelect(); - return (DtxSession) clientSession; - } - - public void setClosedListener(ClosedListener closedListner) - { - - _closedListner = closedListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java b/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java deleted file mode 100644 index c0c6978a14..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient; - -import org.apache.qpidity.ErrorCode; - - -/** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - */ -public interface ClosedListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * @param errorCode TODO - * @param reason TODO - * @param t TODO - * @see Connection - */ - public void onClosed(ErrorCode errorCode, String reason, Throwable t); -} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java b/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java deleted file mode 100644 index 49167750d1..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient; - -import org.apache.qpidity.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection using the given parameters - * - * @param host host name - * @param port port number - * @param virtualHost the virtual host name - * @param username user name - * @param password password - * @throws QpidException If the communication layer fails to establish the connection. - */ - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; - - /** - * Establish the connection with the broker identified by the URL. - * - * @param url Specifies the URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown. - */ - public void connect(String url) throws QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() throws QpidException; - - /** - * Create a session for this connection. - *

The returned session is suspended - * (i.e. this session is not attached to an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than - * or equal to 0 then the session does not expire. - * @return A newly created (suspended) session. - */ - public Session createSession(long expiryInSeconds); - - /** - * Create a DtxSession for this connection. - *

A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - *

The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal - * to 0 then the session does not expire. - * @return A newly created (suspended) DtxSession. - */ - public DtxSession createDTXSession(int expiryInSeconds); - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ClosedListener - * - * @param exceptionListner The ClosedListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java deleted file mode 100644 index 1d9c63df4f..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient; - -import org.apache.qpidity.transport.Future; -import org.apache.qpidity.transport.GetTimeoutResult; -import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.RecoverResult; -import org.apache.qpidity.transport.XaResult; -import org.apache.qpidity.transport.Xid; - -/** - * The resources for this session are controlled under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * This method is called when messages should be produced and consumed on behalf a transaction - * branch identified by xid. - * possible options are: - *

    - *
  • {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. - *
  • {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. - *
- * - * @param xid Specifies the xid of the transaction branch to be started. - * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @return Confirms to the client that the transaction branch is started or specify the error condition. - */ - public Future dtxStart(Xid xid, Option... options); - - /** - * This method is called when the work done on behalf of a transaction branch finishes or needs to - * be suspended. - * possible options are: - *
    - *
  • {@link Option#FAIL}: indicates that this portion of work has failed; - * otherwise this portion of work has - * completed successfully. - *
  • {@link Option#SUSPEND}: Indicates that the transaction branch is - * temporarily suspended in an incomplete state. - *
- * - * @param xid Specifies the xid of the transaction branch to be ended. - * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @return Confirms to the client that the transaction branch is ended or specifies the error condition. - */ - public Future dtxEnd(Xid xid, Option... options); - - /** - * Commit the work done on behalf of a transaction branch. This method commits the work associated - * with xid. Any produced messages are made available and any consumed messages are discarded. - * The only possible option is: - *
    - *
  • {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used. - *
- * - * @param xid Specifies the xid of the transaction branch to be committed. - * @param options Available option is: {@link Option#ONE_PHASE} - * @return Confirms to the client that the transaction branch is committed or specifies the error condition. - */ - public Future dtxCommit(Xid xid, Option... options); - - /** - * This method is called to forget about a heuristically completed transaction branch. - * - * @param xid Specifies the xid of the transaction branch to be forgotten. - */ - public void dtxForget(Xid xid, Option ... options); - - /** - * This method obtains the current transaction timeout value in seconds. If set-timeout was not - * used prior to invoking this method, the return value is the default timeout value; otherwise, the - * value used in the previous set-timeout call is returned. - * - * @param xid Specifies the xid of the transaction branch used for getting the timeout. - * @return The current transaction timeout value in seconds. - */ - public Future dtxGetTimeout(Xid xid, Option ... options); - - /** - * This method prepares any message produced or consumed on behalf of xid, ready for commitment. - * - * @param xid Specifies the xid of the transaction branch to be prepared. - * @return The status of the prepare operation can be any one of: - * xa-ok: Normal execution. - *

- * xa-rdonly: The transaction branch was read-only and has been committed. - *

- * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified - * reason. - *

- * xa-rbtimeout: The work represented by this transaction branch took too long. - */ - public Future dtxPrepare(Xid xid, Option ... options); - - /** - * This method is called to obtain a list of transaction branches that are in a prepared or - * heuristically completed state. - * @return a array of xids to be recovered. - */ - public Future dtxRecover(Option ... options); - - /** - * This method rolls back the work associated with xid. Any produced messages are discarded and - * any consumed messages are re-queued. - * - * @param xid Specifies the xid of the transaction branch to be rolled back. - * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. - */ - public Future dtxRollback(Xid xid, Option ... options); - - /** - * Sets the specified transaction branch timeout value in seconds. - * - * @param xid Specifies the xid of the transaction branch for setting the timeout. - * @param timeout The transaction timeout value in seconds. - */ - public void dtxSetTimeout(Xid xid, long timeout, Option ... options); -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java deleted file mode 100644 index feb4c1c94d..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java +++ /dev/null @@ -1,115 +0,0 @@ - package org.apache.qpidity.nclient; - -import java.util.Enumeration; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.QueueBrowser; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; - -public class JMSTestCase -{ - - public static void main(String[] args) - { - - try - { - javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672"); - con.start(); - - javax.jms.Session ssn = con.createSession(false, 1); - - javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); - javax.jms.MessageProducer prod = ssn.createProducer(dest); - QueueBrowser browser = ssn.createBrowser((Queue)dest, "Test = 'test'"); - - javax.jms.TextMessage msg = ssn.createTextMessage(); - msg.setStringProperty("TEST", "test"); - msg.setText("Should get this"); - prod.send(msg); - - javax.jms.TextMessage msg2 = ssn.createTextMessage(); - msg2.setStringProperty("TEST", "test2"); - msg2.setText("Shouldn't get this"); - prod.send(msg2); - - - Enumeration enu = browser.getEnumeration(); - for (;enu.hasMoreElements();) - { - System.out.println(enu.nextElement()); - System.out.println("\n"); - } - - javax.jms.MessageConsumer cons = ssn.createConsumer(dest, "Test = 'test'"); - javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive(); - cons.setMessageListener(new MessageListener() - { - public void onMessage(Message m) - { - javax.jms.TextMessage m2 = (javax.jms.TextMessage)m; - try - { - System.out.println("headers : " + m2.toString()); - System.out.println("m : " + m2.getText()); - System.out.println("\n\n"); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - }); - - con.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - e.printStackTrace(); - } - }); - - System.out.println("Waiting"); - while (m == null) - { - - } - - System.out.println("Exiting"); - - /*javax.jms.TextMessage msg = ssn.createTextMessage(); - msg.setText("This is a test message"); - msg.setBooleanProperty("targetMessage", false); - prod.send(msg); - - msg.setBooleanProperty("targetMessage", true); - prod.send(msg); - - javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); - - if (m == null) - { - System.out.println("message is null"); - } - else - { - System.out.println("message is not null" + m); - }*/ - - } - catch(Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java deleted file mode 100644 index 7c7881502a..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java +++ /dev/null @@ -1,63 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.transport.Header; - -/** - * Assembles message parts. - *

The sequence of event for transferring a message is as follows: - *

    - *
  • messageHeaders - *
  • n calls to addData - *
  • messageReceived - *
- * It is up to the implementation to assemble the message once the different parts - * are transferred. - */ -public interface MessagePartListener -{ - /** - * Indicates the Message transfer has started. - * - * @param transferId The message transfer ID. - */ - public void messageTransfer(int transferId); - - /** - * Add the following a header to the message being received. - * - * @param header Either DeliveryProperties or ApplicationProperties - */ - public void messageHeader(Header header); - - /** - * Add the following byte array to the content of the message being received - * - * @param src Data to be added or streamed. - */ - public void data(ByteBuffer src); - - /** - * Indicates that the message has been fully received. - */ - public void messageReceived(); - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java deleted file mode 100644 index 218a7ed571..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ /dev/null @@ -1,595 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.qpidity.transport.*; -import org.apache.qpidity.api.Message; - -/** - *

A session is associated with a connection. - * When it is created, a session is not associated with an underlying channel. - * The session is single threaded.

- *

- * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. - * For example, command1 will be synchronously invoked by using the following sequence: - *

    - *
  • session.command1() - *
  • session.sync() - *
- */ -public interface Session -{ - public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1; - public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0; - public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0; - public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1; - public static final short MESSAGE_FLOW_MODE_CREDIT = 0; - public static final short MESSAGE_FLOW_MODE_WINDOW = 1; - public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; - public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; - public static final short MESSAGE_REJECT_CODE_GENERIC = 0; - public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; - public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; - public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - - /** - * Sync method will block the session until all outstanding commands - * are executed. - */ - public void sync(); - - public void close(); - - public void sessionDetach(byte[] name, Option ... options); - - public void sessionRequestTimeout(long expiry, Option ... options); - - public byte[] getName(); - - public void setAutoSync(boolean value); - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - /** - * Transfer a message to a specified exchange. - *

- *

This transfer provides a complete message - * using a single method. The method is internally mapped to messageTransfer() and headers() followed - * by data() and endData(). - * This method should only be used by small messages.

- * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - *

    - *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - *
- * @param acquireMode
    - *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - *
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) - throws IOException; - - - /** - *

This transfer streams a complete message using a single method. - * It uses pull-semantics instead of doing a push.

- *

Data is pulled from a Message object using read() - * and pushed using messageTransfer() and headers() followed by data() and endData(). - *
This method should only be used by large messages
- * There are two convenience Message classes to do this. - *

    - *
  • {@link org.apache.qpidity.nclient.util.FileMessage} - *
  • {@link org.apache.qpidity.nclient.util.StreamingMessage} - *
- * You can also implement a Message interface to wrap any - * data stream. - *

- * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - *

    - *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - *
- * @param acquireMode
    - *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message - * is acquired when the transfer starts. - *
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException; - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Option ... options); - - /** - * Make a set of headers to be sent together with a message - * - * @param headers headers to be added - * @see org.apache.qpidity.transport.DeliveryProperties - * @see org.apache.qpidity.transport.MessageProperties - * @return The added headers. - */ - public Header header(Struct... headers); - - /** - * Add a byte array to the content of the message being sent. - * - * @param data Data to be added. - */ - public void data(byte[] data); - - /** - * A Add a ByteBuffer to the content of the message being sent. - *

Note that only the data between the buffer's current position and the - * buffer limit is added. - * It is therefore recommended to flip the buffer before adding it to the message, - * - * @param buf Data to be added. - */ - public void data(ByteBuffer buf); - - /** - * Add a string to the content of the message being sent. - * - * @param str String to be added. - */ - public void data(String str); - - /** - * Signals the end of data for the message. - */ - public void endData(); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - *

The destination is bound to a queue, and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). - *

The valid options are: - *

    - *
  • {@link Option#EXCLUSIVE}:

    Requests exclusive subscription access, so that only this - * subscription can access the queue. - *

  • {@link Option#NONE}:

    This is an empty option, and has no effect. - *

- * - * @param queue The queue that the receiver is receiving messages from. - * @param destination The destination, or delivery tag, for the subscriber. - * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - *

    - *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - *
- * @param acquireMode
    - *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must - * be explicitly acquired. - *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - *
- * @param listener The listener for this destination. To transfer large messages - * use a {@link org.apache.qpidity.nclient.MessagePartListener}. - * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NONE}. - * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies - * according to the provider's implementation. - */ - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map filter, Option... options); - - /** - * This method cancels a consumer. The server will not send any more messages to the specified destination. - * This does not affect already delivered messages. - * The client may receive a - * number of messages in between sending the cancel method and receiving - * notification that the cancellation has been completed. - * - * @param destination The destination to be cancelled. - */ - public void messageCancel(String destination, Option ... options); - - /** - * Associate a message listener with a destination. - *

Only one listener is permitted for each destination. When a new listener is created, - * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener - * has completed processing a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - *

With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - *

Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode

  • credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control - *
  • window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control
- */ - public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); - - - /** - * This method controls the flow of message data to a given destination. It is used by the - * recipient of messages to dynamically match the incoming rate of message flow to its - * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" - * number of the specified unit to the available credit balance for the specified destination. - * A value of 0 indicates an infinite amount of credit. This disables any limit for - * the given unit until the credit balance is zeroed with {@link Session#messageStop} - * or {@link Session#messageFlush}. - * - * @param destination The destination to set the flow. - * @param unit Specifies the unit of credit balance. - *

- * One of:

    - *
  • message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE}) - *
  • byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE}) - *
- * @param value Number of credits, a value of 0 indicates an infinite amount of credit. - */ - public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); - - /** - * Forces the broker to exhaust its credit supply. - *

The credit on the broker will remain at zero once - * this method is completed. - * - * @param destination The destination on which the credit supply is to be exhausted. - */ - public void messageFlush(String destination, Option ... options); - - /** - * On receipt of this method, the brokers set credit to zero for a given - * destination. When confirmation of this method - * is issued credit is set to zero. No further messages will be sent until - * further credit is received. - * - * @param destination The destination on which to reset credit. - */ - public void messageStop(String destination, Option ... options); - - /** - * Acknowledge the receipt of a range of messages. - *

Messages must already be acquired, either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of messages to be acknowledged. - * @param accept pecify whether to send a message accept to the broker - */ - public void messageAcknowledge(RangeSet ranges, boolean accept); - - /** - * Reject a range of acquired messages. - *

The broker will deliver rejected messages to the - * alternate-exchange on the queue from which it came. If no alternate-exchange is - * defined for that queue the broker will discard the message. - * - * @param ranges Range of messages to be rejected. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - *

Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - *

A recommended invocation sequence would be: - *

    - *
  • {@link Session#messageReject} - *
  • {@link Session#sync()} - *
  • {@link Session#getRejectedMessages()} - *
- * - * @return The rejected message ranges - */ - public RangeSet getRejectedMessages(); - - /** - * Try to acquire ranges of messages hence releasing them form the queue. - * This means that once acknowledged, a message will not be delivered to any other receiver. - *

As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - *

This method should only be called on non-acquired messages. - * - * @param ranges Ranges of messages to be acquired. - * @return Indicates the acquired messages - */ - public Future messageAcquire(RangeSet ranges, Option ... options); - - /** - * Give up responsibility for processing ranges of messages. - *

Released messages are re-enqueued. - * - * @param ranges Ranges of messages to be released. - * @param options Valid option is: {@link Option#SET_REDELIVERED}) - */ - public void messageRelease(RangeSet ranges, Option ... options); - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - */ - public void txSelect(Option ... options); - - /** - * Commit the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txCommit(Option ... options) throws IllegalStateException; - - /** - * Roll back the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txRollback(Option ... options) throws IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - *

Following are the valid options: - *

    - *
  • {@link Option#AUTO_DELETE}:

    If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * If this field is set and the exclusive field is not set the queue is deleted when all - * the consumers have finished using it. - *

  • {@link Option#DURABLE}:

    If set when creating a new queue, - * the queue will be marked as durable. Durable queues - * remain active when a server restarts. Non-durable queues (transient queues) are purged - * if/when a server restarts. Note that durable queues do not necessarily hold persistent - * messages, although it does not make sense to send persistent messages to a transient - * queue. - *

  • {@link Option#EXCLUSIVE}:

    Exclusive queues can only be used from one connection at a time. - * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the - * declaring connection closes. - *

  • {@link Option#PASSIVE}:

    If set, the server will not create the queue. - * This field allows the client to assert the presence of a queue without modifying the server state. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - *

  1. The queue is deleted when it is not empty; - *
  2. Immediate delivery of a message is requested, but there are no consumers connected to - * the queue.
- * @param arguments Used for backward compatibility - * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map arguments, - Option... options); - - /** - * Bind a queue with an exchange. - * - * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current - * queue for the session, which is the last declared queue. - * @param exchangeName The exchange name. - * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages - * depending on the exchange configuration. Not all exchanges use a routing key - refer to - * the specific exchange documentation. If the queue name is empty, the server uses the last - * queue declared on the session. If the routing key is also empty, the server uses this - * queue name for the routing key as well. If the queue name is provided but the routing key - * is empty, the server does the binding with that empty routing key. The meaning of empty - * routing keys depends on the exchange implementation. - * @param arguments Used for backward compatibility - */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map arguments, - Option ... options); - - /** - * Unbind a queue from an exchange. - * - * @param queueName Specifies the name of the queue to unbind. - * @param exchangeName The name of the exchange to unbind from. - * @param routingKey Specifies the routing key of the binding to unbind. - */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); - - /** - * This method removes all messages from a queue. It does not cancel consumers. Purged messages - * are deleted without any formal "undo" mechanism. - * - * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - */ - public void queuePurge(String queueName, Option ... options); - - /** - * This method deletes a queue. When a queue is deleted any pending messages are sent to a - * dead-letter queue if this is defined in the server configuration, and all consumers on the - * queue are cancelled. - *

Following are the valid options: - *

    - *
  • {@link Option#IF_EMPTY}:

    If set, the server will only delete the queue if it has no messages. - *

  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the queue if it has no consumers. - * If the queue has consumers the server does does not delete it but raises a channel exception instead. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NONE}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - - /** - * This method is used to request information on a particular queue. - * - * @param queueName The name of the queue for which information is requested. - * @return Information on the specified queue. - */ - public Future queueQuery(String queueName, Option ... options); - - - /** - * This method is used to request information on a particular binding. - * - * @param exchange The exchange name. - * @param queue The queue name. - * @param routingKey The routing key - * @param arguments bacward compatibilties params. - * @return Information on the specified binding. - */ - public Future exchangeBound(String exchange, String queue, String routingKey, - Map arguments, Option ... options); - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * This method creates an exchange. If the exchange already exists, - * the method verifies the class and checks the details are correct. - *

Valid options are: - *

    - *
  • {@link Option#AUTO_DELETE}:

    If set, the exchange is deleted when all queues have finished using it. - *

  • {@link Option#DURABLE}:

    If set, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged when a server restarts. - *

  • {@link Option#PASSIVE}:

    If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - *

  • {@link Option#NONE}:

    This option is an empty option, and has no effect. - *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NONE}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, - Map arguments, Option... options); - - /** - * This method deletes an exchange. When an exchange is deleted all queue bindings on the - * exchange are cancelled. - *

Following are the valid options: - *

    - *
  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

Note that if an option is not set, it will default to false. - * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options); - - - /** - * This method is used to request information about a particular exchange. - * - * @param exchangeName The name of the exchange about which information is requested. If not set, the method will - * return information about the default exchange. - * @return Information on the specified exchange. - */ - public Future exchangeQuery(String exchangeName, Option ... options); - - /** - * If the session receives a sessionClosed with an error code it - * informs the session's exceptionListener - * - * @param exceptionListner The exceptionListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java deleted file mode 100644 index f7978d0d98..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ /dev/null @@ -1,206 +0,0 @@ -package org.apache.qpidity.nclient.impl; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpidity.QpidException; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.ClosedListener; -import org.apache.qpidity.nclient.MessagePartListener; -import org.apache.qpidity.transport.MessageAcceptMode; -import org.apache.qpidity.transport.MessageAcquireMode; -import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.Range; -import org.apache.qpidity.transport.RangeSet; - -import static org.apache.qpidity.transport.Option.*; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession -{ - static - { - String max = "message_size_before_sync"; // KB's - try - { - MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_SYNC_DATA_LENGH = 200000000; - } - String flush = "message_size_before_flush"; - try - { - MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_FLUSH_DATA_LENGH = 20000000; - } - } - - private static long MAX_NOT_SYNC_DATA_LENGH; - private static long MAX_NOT_FLUSH_DATA_LENGH; - - private Map _messageListeners = new ConcurrentHashMap(); - private ClosedListener _exceptionListner; - private RangeSet _rejectedMessages; - private long _currentDataSizeNotSynced; - private long _currentDataSizeNotFlushed; - - public ClientSession(byte[] name) - { - super(name); - } - - public void messageAcknowledge(RangeSet ranges, boolean accept) - { - for (Range range : ranges) - { - super.processed(range); - } - super.flushProcessed(accept ? BATCH : NONE); - if (accept) - { - messageAccept(ranges); - } - } - - public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), null, 0, filter, - options); - } - - public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - // The javadoc clearly says that this method is suitable for small messages - // therefore reading the content in one shot. - ByteBuffer data = msg.readData(); - super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode)); - // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() ); - if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() ) - { - msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) ); - msg.getDeliveryProperties().setDirty(false); - msg.getMessageProperties().setDirty(false); - } - else - { - super.header(msg.getHeader()); - } - data( data ); - endData(); - } - - public void sync() - { - super.sync(); - _currentDataSizeNotSynced = 0; - } - - /* ------------------------- - * Data methods - * ------------------------*/ - - public void data(ByteBuffer buf) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining(); - _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining(); - super.data(buf); - } - - public void data(String str) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length; - super.data(str); - } - - public void data(byte[] bytes) - { - _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length; - super.data(bytes); - } - - public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - super.messageTransfer(destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode)); - super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); - boolean b = true; - int count = 0; - while(b) - { - try - { - System.out.println("count : " + count++); - data(msg.readData()); - } - catch(EOFException e) - { - b = false; - } - } - endData(); - } - - public void endData() - { - super.endData(); - /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) - { - sync(); - } - if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH) - { - executionFlush(); - _currentDataSizeNotFlushed = 0; - }*/ - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - if (listener == null) - { - throw new IllegalArgumentException("Cannot set message listener to null"); - } - _messageListeners.put(destination, listener); - } - - public void setClosedListener(ClosedListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onClosed(null, null, null); - } - - Map getMessageListeners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java deleted file mode 100644 index da6f5e7d45..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.apache.qpidity.nclient.impl; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.ErrorCode; - -import org.apache.qpidity.nclient.MessagePartListener; - -import org.apache.qpidity.QpidException; -import org.apache.qpidity.transport.Data; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.transport.MessageReject; -import org.apache.qpidity.transport.MessageTransfer; -import org.apache.qpidity.transport.Range; -import org.apache.qpidity.transport.Session; -import org.apache.qpidity.transport.SessionDetached; -import org.apache.qpidity.transport.SessionDelegate; - - -public class ClientSessionDelegate extends SessionDelegate -{ - private MessageTransfer _currentTransfer; - private MessagePartListener _currentMessageListener; - - @Override public void sessionDetached(Session ssn, SessionDetached dtc) - { - ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null)); - } - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - @Override public void data(Session ssn, Data data) - { - _currentMessageListener.data(data.getData()); - if (data.isLast()) - { - _currentMessageListener.messageReceived(); - } - } - - @Override public void header(Session ssn, Header header) - { - _currentMessageListener.messageHeader(header); - if( header.hasNoPayload()) - { - _currentMessageListener.data(ByteBuffer.allocate(0)); - _currentMessageListener.messageReceived(); - } - } - - - @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) - { - _currentTransfer = currentTransfer; - _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination()); - _currentMessageListener.messageTransfer(currentTransfer.getId()); - } - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); - session.processed(struct); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java deleted file mode 100644 index 83d491baad..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpidity.nclient.impl; - -/** - * This class holds all the 0.10 client constants which value can be set - * through properties. - */ -public class Constants -{ - static - { - - String max="message_size_before_sync";// KB's - try - { - MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_SYNC_DATA_LENGH=200000000; - } - String flush="message_size_before_flush"; - try - { - MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_FLUSH_DATA_LENGH=20000000; - } - } - - /** - * The total message size in KBs that can be transferted before - * client and broker are synchronized. - * A sync will result in the client library releasing the sent messages - * from memory. (messages are kept - * in memory so client can reconnect to a broker in the event of a failure) - *

- * Property name: message_size_before_sync - *

- * Default value: 200000000 - */ - public static long MAX_NOT_SYNC_DATA_LENGH; - /** - * The total message size in KBs that can be transferted before - * messages are flushed. - * When a flush returns all messages have reached the broker. - *

- * Property name: message_size_before_flush - *

- * Default value: 200000000 - */ - public static long MAX_NOT_FLUSH_DATA_LENGH; - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java deleted file mode 100644 index 05b99a3cf1..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.apache.qpidity.nclient.impl; - -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ClosedListener; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessageListener; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageAcceptMode; -import org.apache.qpidity.transport.MessageAcquireMode; -import org.apache.qpidity.transport.MessageProperties; - -import java.util.UUID; - -public class DemoClient -{ - public static MessagePartListenerAdapter createAdapter() - { - return new MessagePartListenerAdapter(new MessageListener() - { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); - } - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); - ssn.queueDeclare("queue1", null, null); - ssn.exchangeBind("queue1", "amq.direct", "queue1",null); - ssn.sync(); - - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); - - // queue - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.header(new DeliveryProperties().setRoutingKey("queue1"), - new MessageProperties().setMessageId(UUID.randomUUID())); - ssn.data("this is the data"); - ssn.endData(); - - //reject - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.data("this should be rejected"); - ssn.header(new DeliveryProperties().setRoutingKey("stocks")); - ssn.endData(); - ssn.sync(); - - // topic subs - ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); - ssn.sync(); - - ssn.queueDeclare("topic1", null, null); - ssn.exchangeBind("topic1", "amq.topic", "stock.*",null); - ssn.queueDeclare("topic2", null, null); - ssn.exchangeBind("topic2", "amq.topic", "stock.us.*",null); - ssn.queueDeclare("topic3", null, null); - ssn.exchangeBind("topic3", "amq.topic", "stock.us.rh",null); - ssn.sync(); - - // topic - ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.data("Topic message"); - ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"), - new MessageProperties().setMessageId(UUID.randomUUID())); - ssn.endData(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java deleted file mode 100644 index 64ffe17fe0..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.apache.qpidity.nclient.impl; - -import java.io.FileInputStream; - -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ClosedListener; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.FileMessage; -import org.apache.qpidity.nclient.util.MessageListener; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; - -import java.util.UUID; - -public class LargeMsgDemoClient -{ - public static MessagePartListenerAdapter createAdapter() - { - return new MessagePartListenerAdapter(new MessageListener() - { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); - } - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); - ssn.queueDeclare("queue1", null, null); - ssn.exchangeBind("queue1", "amq.direct", "queue1",null); - ssn.sync(); - - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); - - try - { - FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"), - 1024, - new DeliveryProperties().setRoutingKey("queue1"), - new MessageProperties().setMessageId(UUID.randomUUID())); - - // queue - ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); - ssn.sync(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java deleted file mode 100644 index e452091622..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java +++ /dev/null @@ -1,156 +0,0 @@ -package org.apache.qpidity.nclient.interop; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ClosedListener; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessageListener; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageAcceptMode; -import org.apache.qpidity.transport.MessageAcquireMode; -import org.apache.qpidity.transport.MessageCreditUnit; -import org.apache.qpidity.transport.MessageFlowMode; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.RangeSet; - -public class BasicInteropTest implements ClosedListener -{ - - private Session session; - private Connection conn; - private String host; - - public BasicInteropTest(String host) - { - this.host = host; - } - - public void close() throws QpidException - { - conn.close(); - } - - public void testCreateConnection(){ - System.out.println("------- Creating connection--------"); - conn = Client.createConnection(); - try{ - conn.connect(host, 5672, "test", "guest", "guest"); - }catch(Exception e){ - System.out.println("------- Error Creating connection--------"); - e.printStackTrace(); - System.exit(1); - } - System.out.println("------- Connection created Suscessfully --------"); - } - - public void testCreateSession(){ - System.out.println("------- Creating session --------"); - session = conn.createSession(0); - System.out.println("------- Session created sucessfully --------"); - } - - public void testExchange(){ - System.out.println("------- Creating an exchange --------"); - session.exchangeDeclare("test", "direct", "", null); - session.sync(); - System.out.println("------- Exchange created --------"); - } - - public void testQueue(){ - System.out.println("------- Creating a queue --------"); - session.queueDeclare("testQueue", "", null); - session.sync(); - System.out.println("------- Queue created --------"); - - System.out.println("------- Binding a queue --------"); - session.exchangeBind("testQueue", "test", "testKey", null); - session.sync(); - System.out.println("------- Queue bound --------"); - } - - public void testSendMessage(){ - System.out.println("------- Sending a message --------"); - session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - - Map props = new HashMap(); - props.put("name", "rajith"); - props.put("age", 10); - props.put("spf", 8.5); - session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props)); - - //session.header(new DeliveryProperties().setRoutingKey("testKey")); - - session.data("TestMessage"); - session.endData(); - session.sync(); - System.out.println("------- Message sent --------"); - } - - public void testSubscribe() - { - System.out.println("------- Sending a subscribe --------"); - session.messageSubscribe("testQueue", "myDest", - Session.TRANSFER_CONFIRM_MODE_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(new MessageListener(){ - - public void onMessage(Message message) - { - System.out.println("--------Message Received--------"); - System.out.println(message.toString()); - System.out.println("--------/Message Received--------"); - RangeSet ack = new RangeSet(); - ack.add(message.getMessageTransferId(),message.getMessageTransferId()); - session.messageAcknowledge(ack, true); - } - - }), - null); - - System.out.println("------- Setting Credit mode --------"); - session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); - System.out.println("------- Setting Credit --------"); - session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1); - session.messageFlow("myDest", MessageCreditUnit.BYTE, -1); - } - - public void testMessageFlush() - { - session.messageFlush("myDest"); - session.sync(); - } - - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("------- Broker Notified an error --------"); - System.out.println("------- " + errorCode + " --------"); - System.out.println("------- " + reason + " --------"); - System.out.println("------- /Broker Notified an error --------"); - } - - public static void main(String[] args) throws QpidException - { - String host = "0.0.0.0"; - if (args.length>0) - { - host = args[0]; - } - - BasicInteropTest t = new BasicInteropTest(host); - t.testCreateConnection(); - t.testCreateSession(); - t.testExchange(); - t.testQueue(); - t.testSubscribe(); - t.testSendMessage(); - t.testMessageFlush(); - t.close(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java deleted file mode 100644 index 833f905b58..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java +++ /dev/null @@ -1,160 +0,0 @@ -package org.apache.qpidity.nclient.util; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.Queue; - -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.api.Message; - -/** - *

A Simple implementation of the message interface - * for small messages. When the readData methods are called - * we assume the message is complete. i.e there want be any - * appendData operations after that.

- * - *

If you need large message support please see - * FileMessage and StreamingMessage - *

- */ -public class ByteBufferMessage implements Message -{ - private Queue _data = new LinkedList(); - private ByteBuffer _readBuffer; - private int _dataSize; - private DeliveryProperties _currentDeliveryProps; - private MessageProperties _currentMessageProps; - private int _transferId; - private Header _header; - - public void setHeader(Header header) { - _header = header; - } - - public Header getHeader() { - return _header; - } - - public ByteBufferMessage() - { - _currentDeliveryProps = new DeliveryProperties(); - _currentMessageProps = new MessageProperties(); - } - - public ByteBufferMessage(int transferId) - { - _transferId = transferId; - } - - public int getMessageTransferId() - { - return _transferId; - } - - public void clearData() - { - _data = new LinkedList(); - _readBuffer = null; - } - - public void appendData(byte[] src) throws IOException - { - appendData(ByteBuffer.wrap(src)); - } - - /** - * write the data from the current position up to the buffer limit - */ - public void appendData(ByteBuffer src) throws IOException - { - _data.offer(src); - _dataSize += src.remaining(); - } - - public DeliveryProperties getDeliveryProperties() - { - return _currentDeliveryProps; - } - - public MessageProperties getMessageProperties() - { - return _currentMessageProps; - } - - public void setDeliveryProperties(DeliveryProperties props) - { - _currentDeliveryProps = props; - } - - public void setMessageProperties(MessageProperties props) - { - _currentMessageProps = props; - } - - public void readData(byte[] target) throws IOException - { - getReadBuffer().get(target); - } - - public ByteBuffer readData() throws IOException - { - return getReadBuffer(); - } - - private void buildReadBuffer() - { - //optimize for the simple cases - if(_data.size() == 1) - { - _readBuffer = _data.element().duplicate(); - } - else - { - _readBuffer = ByteBuffer.allocate(_dataSize); - for(ByteBuffer buf:_data) - { - _readBuffer.put(buf); - } - _readBuffer.flip(); - } - } - - private ByteBuffer getReadBuffer() throws IOException - { - if (_readBuffer != null ) - { - return _readBuffer.slice(); - } - else - { - if (_data.size() >0) - { - buildReadBuffer(); - return _readBuffer.slice(); - } - else - { - throw new IOException("No Data to read"); - } - } - } - - //hack for testing - @Override public String toString() - { - try - { - ByteBuffer temp = getReadBuffer(); - byte[] b = new byte[temp.remaining()]; - temp.get(b); - return new String(b); - } - catch(IOException e) - { - return "No data"; - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java deleted file mode 100644 index 289d03574d..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java +++ /dev/null @@ -1,96 +0,0 @@ -package org.apache.qpidity.nclient.util; - -import java.io.EOFException; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; - -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.api.Message; - -/** - * FileMessage provides pull style semantics for - * larges messages backed by a disk. - * Instead of loading all data into memeory it uses - * FileChannel to map regions of the file into memeory - * at a time. - * - * The write methods are not supported. - * - * From the standpoint of performance it is generally - * only worth mapping relatively large files into memory. - * - * FileMessage msg = new FileMessage(in,delProps,msgProps); - * session.messageTransfer(dest,msg,0,0); - * - * The messageTransfer method will read the file in chunks - * and stream it. - * - */ -public class FileMessage extends ReadOnlyMessage implements Message -{ - private FileChannel _fileChannel; - private int _chunkSize; - private long _fileSize; - private long _pos = 0; - - public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException - { - _messageProperties = messageProperties; - _deliveryProperties = deliveryProperties; - - _fileChannel = in.getChannel(); - _chunkSize = chunkSize; - _fileSize = _fileChannel.size(); - - if (_fileSize <= _chunkSize) - { - _chunkSize = (int)_fileSize; - } - } - - public void setHeader(Header header) { - //To change body of implemented methods use File | Settings | File Templates. - } - - public Header getHeader() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void readData(byte[] target) throws IOException - { - throw new UnsupportedOperationException(); - } - - public ByteBuffer readData() throws IOException - { - if (_pos == _fileSize) - { - throw new EOFException(); - } - - if (_pos + _chunkSize > _fileSize) - { - _chunkSize = (int)(_fileSize - _pos); - } - MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); - _pos += _chunkSize; - return bb; - } - - /** - * This message is used by an application user to - * provide data to the client library using pull style - * semantics. Since the message is not transfered yet, it - * does not have a transfer id. Hence this method is not - * applicable to this implementation. - */ - public int getMessageTransferId() - { - throw new UnsupportedOperationException(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java deleted file mode 100644 index 43c20eb6b5..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.nclient.util; - -import org.apache.qpidity.api.Message; - -/** - *A message listener - */ -public interface MessageListener -{ - /** - * Process an incoming message. - * - * @param message The incoming message. - */ - public void onMessage(Message message); -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java deleted file mode 100644 index 757d44fbbb..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.apache.qpidity.nclient.util; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.nclient.MessagePartListener; - -/** - * This is a simple message assembler. - * Will call onMessage method of the adaptee - * when all message data is read. - * - * This is a good convinience utility for handling - * small messages - */ -public class MessagePartListenerAdapter implements MessagePartListener -{ - MessageListener _adaptee; - ByteBufferMessage _currentMsg; - - public MessagePartListenerAdapter(MessageListener listener) - { - _adaptee = listener; - } - - public void messageTransfer(int transferId) - { - _currentMsg = new ByteBufferMessage(transferId); - } - - public void data(ByteBuffer src) - { - try - { - _currentMsg.appendData(src); - } - catch(IOException e) - { - // A chance for IO exception - // doesn't occur as we are using - // a ByteBuffer - } - } - - public void messageHeader(Header header) - { - _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class)); - _currentMsg.setMessageProperties(header.get(MessageProperties.class)); - } - - public void messageReceived() - { - _adaptee.onMessage(_currentMsg); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java deleted file mode 100644 index 8ff5c62a25..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.qpidity.nclient.util; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.api.Message; - -public abstract class ReadOnlyMessage implements Message -{ - MessageProperties _messageProperties; - DeliveryProperties _deliveryProperties; - - public void appendData(byte[] src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public void appendData(ByteBuffer src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public DeliveryProperties getDeliveryProperties() - { - return _deliveryProperties; - } - - public MessageProperties getMessageProperties() - { - return _messageProperties; - } - - public void clearData() - { - throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data"); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java deleted file mode 100644 index 6c7f9e9db7..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.qpidity.nclient.util; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.api.Message; - -public class StreamingMessage extends ReadOnlyMessage implements Message -{ - SocketChannel _socChannel; - private int _chunkSize; - private ByteBuffer _readBuf; - - public Header getHeader() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setHeader(Header header) { - //To change body of implemented methods use File | Settings | File Templates. - } - - public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException - { - _messageProperties = messageProperties; - _deliveryProperties = deliveryProperties; - - _socChannel = in; - _chunkSize = chunkSize; - _readBuf = ByteBuffer.allocate(_chunkSize); - } - - public void readData(byte[] target) throws IOException - { - throw new UnsupportedOperationException(); - } - - public ByteBuffer readData() throws IOException - { - if(_socChannel.isConnected() && _socChannel.isOpen()) - { - _readBuf.clear(); - _socChannel.read(_readBuf); - } - else - { - throw new EOFException("The underlying socket/channel has closed"); - } - - return _readBuf.duplicate(); - } - - /** - * This message is used by an application user to - * provide data to the client library using pull style - * semantics. Since the message is not transfered yet, it - * does not have a transfer id. Hence this method is not - * applicable to this implementation. - */ - public int getMessageTransferId() - { - throw new UnsupportedOperationException(); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java deleted file mode 100644 index e00f9008d3..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java +++ /dev/null @@ -1,60 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.njms; - -import org.apache.qpidity.QpidException; - -import javax.jms.JMSException; -import javax.transaction.xa.XAException; - -/** - * Helper class for handling exceptions - */ -public class ExceptionHelper -{ - static public JMSException convertQpidExceptionToJMSException(Exception exception) - { - JMSException jmsException = null; - if (!(exception instanceof JMSException)) - { - if (exception instanceof QpidException) - { - jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); - } - else - { - jmsException = new JMSException(exception.getMessage()); - } - jmsException.setLinkedException(exception); - jmsException.initCause(exception); - } - else - { - jmsException = (JMSException) exception; - } - return jmsException; - } - - static public XAException convertQpidExceptionToXAException(QpidException exception) - { - String qpidErrorCode = String.valueOf(exception.getErrorCode()); - // todo map this error to an XA code - int xaCode = XAException.XAER_PROTO; - return new XAException(xaCode); - } -} -- cgit v1.2.1