diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-17 15:58:20 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-17 15:58:20 +0000 |
| commit | 04e409c589c579ba2575ed7e9218b58ca27fc782 (patch) | |
| tree | bc5db0459c8a07d94f81d857cffb6b7878a640f4 /java/client/src | |
| parent | 66b1f5db27110f82d530846c450dc1b6965de7f1 (diff) | |
| download | qpid-python-04e409c589c579ba2575ed7e9218b58ca27fc782.tar.gz | |
fixed several bugs after running samples against 0_10
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@576491 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
7 files changed, 50 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index dd35929c6f..86b4807069 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -689,8 +689,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); - - _protocolHandler.closeConnection(timeout); + _delegate.closeConneciton(timeout); + //_protocolHandler.closeConnection(timeout); } catch (AMQException e) @@ -703,6 +703,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + + /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e4c9a259ab..eccca87dc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -36,4 +36,6 @@ public interface AMQConnectionDelegate final int prefetchHigh, final int prefetchLow) throws JMSException; public void resubscribeSessions() throws JMSException, AMQException, FailoverException; + + public void closeConneciton(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 93a12b602b..8b6b416f88 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -49,8 +49,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate int channelId = _conn._idFactory.incrementAndGet(); AMQSession session; try - { - session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); + { + session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); _conn.registerSession(channelId, session); if (_conn._started) { @@ -100,4 +101,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate throw new FailoverException("failing to reconnect during failover, operation not supported."); } + + public void closeConneciton(long timeout) throws JMSException, AMQException + { + try + { + _qpidConnection.close(); + } + catch (QpidException e) + { + throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e); + } + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java index 69dfed2dd9..3ad32d83fb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java @@ -51,6 +51,13 @@ public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class); private AMQConnection _conn; + + public void closeConneciton(long timeout) throws JMSException, AMQException + { + _conn.getProtocolHandler().closeConnection(timeout); + + } + public AMQConnectionDelegate_0_8(AMQConnection conn) { _conn = conn; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index f3fa79eb51..2fd2b04c99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -22,6 +22,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpidity.jms.ExceptionHelper; @@ -48,20 +49,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public void declareDestination(AMQDestination destination) { - // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null, - // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type - _protocolHandler.writeFrame(declare); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(), + destination.getExchangeClass().toString(), + null, + null + ); } //--- Overwritten methods @@ -105,7 +97,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); // set the application properties qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString()); - qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString()); + AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); + if( correlationID != null ) + { + qpidityMessage.getMessageProperties().setCorrelationId(correlationID.toString()); + } String replyToURL = contentHeaderProperties.getReplyToAsString(); if (replyToURL != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index f5603b1695..1f70663df8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -126,7 +126,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory DeliveryProperties devprop = (DeliveryProperties) contentHeader[1]; props.setContentType(mprop.getContentType()); props.setCorrelationId(mprop.getCorrelationId()); - props.setEncoding(mprop.getContentEncoding()); + String encoding = mprop.getContentEncoding(); + if (!encoding.equals("")) + { + props.setEncoding(encoding); + } props.setExpiration(devprop.getExpiration()); // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders()); props.setMessageId(mprop.getMessageId()); diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java index 758088dd68..605e9ee154 100644 --- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java @@ -356,7 +356,7 @@ public class URLParser_0_10 try { char next = _url[_index]; - while (next != ADDRESS_SEPERATOR_CHAR) + while (next != ADDRESS_SEPERATOR_CHAR && next != END_OF_URL_MARKER ) { b.append(next); next = _url[++_index]; @@ -378,6 +378,10 @@ public class URLParser_0_10 { int port = Integer.parseInt(portStr); _currentBroker.setPort(port); + if( _url[_index] == END_OF_URL_MARKER ) + { + _endOfURL = true; + } return URLParserState.ADDRESS_END; } catch (NumberFormatException e) |
