diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-28 15:48:17 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-28 15:48:17 +0000 |
| commit | f689c47486b4cfc7655e37da2b232fe27be1cc42 (patch) | |
| tree | 09f545bc865a64821f940cef35f8ac453d6388c3 | |
| parent | af097b2fa03725820f0be434ce3e381604ad5bd2 (diff) | |
| download | qpid-python-f689c47486b4cfc7655e37da2b232fe27be1cc42.tar.gz | |
fixed several 0_10 issues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580394 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 48 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 91d68059d3..d069a1b004 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2101,7 +2101,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return getProtocolHandler().getProtocolMinorVersion(); } - private boolean hasMessageListeners() + protected boolean hasMessageListeners() { return _hasMessageListeners; } @@ -2342,7 +2342,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - private void suspendChannel(boolean suspend) throws AMQException // , FailoverException + protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException { synchronized (_suspensionLock) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 03c5849013..89597555d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -22,6 +22,8 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpidity.nclient.Session; @@ -403,6 +405,9 @@ public class AMQSession_0_10 extends AMQSession { getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, MAX_PREFETCH); + // todo this + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); } } // We need to sync so that we get notify of an error. @@ -456,4 +461,41 @@ public class AMQSession_0_10 extends AMQSession } } + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + throws AMQException + { + /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + + //TODO this is for 0_10 only to be changed + amqd.setQueueName(new AMQShortString("tmp_" +System.currentTimeMillis())); + + } + + sendQueueDeclare(amqd,protocolHandler); + + return amqd.getAMQQueueName(); + } + }, _connection).execute(); + } + + + void start() throws AMQException + { + + super.suspendChannel(false); + + // If the event dispatcher is not running then start it too. + if (hasMessageListeners()) + { + startDistpatcherIfNecessary(); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 7cac9773b0..07439eb7f6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -116,10 +116,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer qpidityMessage.getMessageProperties() .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); } - - if (contentHeaderProperties.getHeaders() != null) + //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix + // TODO remove second test + if (contentHeaderProperties.getHeaders() != null && contentHeaderProperties.getHeaders().size() > 1) { - // todo use the new fieldTable qpidityMessage.getMessageProperties().setApplicationHeaders(FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders())); for(String key:qpidityMessage.getMessageProperties().getApplicationHeaders().keySet()) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FiledTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FiledTableSupport.java index ff7e1f72fb..9c4aa9e259 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/FiledTableSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/FiledTableSupport.java @@ -23,9 +23,8 @@ public class FiledTableSupport public static Map<String,Object> convertToMap(FieldTable ft) { Map<String,Object> map = new HashMap<String,Object>(); - for (Enumeration keys = ft.getPropertyNames(); keys.hasMoreElements();) + for (AMQShortString key: ft.keySet() ) { - AMQShortString key = (AMQShortString)keys.nextElement(); map.put(key.asString(), ft.getObject(key)); } |
