summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-09-17 15:58:20 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-09-17 15:58:20 +0000
commit04e409c589c579ba2575ed7e9218b58ca27fc782 (patch)
treebc5db0459c8a07d94f81d857cffb6b7878a640f4 /java/client/src
parent66b1f5db27110f82d530846c450dc1b6965de7f1 (diff)
downloadqpid-python-04e409c589c579ba2575ed7e9218b58ca27fc782.tar.gz
fixed several bugs after running samples against 0_10
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@576491 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java6
7 files changed, 50 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index dd35929c6f..86b4807069 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -689,8 +689,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
-
- _protocolHandler.closeConnection(timeout);
+ _delegate.closeConneciton(timeout);
+ //_protocolHandler.closeConnection(timeout);
}
catch (AMQException e)
@@ -703,6 +703,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+
+
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index e4c9a259ab..eccca87dc2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -36,4 +36,6 @@ public interface AMQConnectionDelegate
final int prefetchHigh, final int prefetchLow) throws JMSException;
public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+
+ public void closeConneciton(long timeout) throws JMSException, AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 93a12b602b..8b6b416f88 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -49,8 +49,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
int channelId = _conn._idFactory.incrementAndGet();
AMQSession session;
try
- {
- session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
+ {
+ session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
_conn.registerSession(channelId, session);
if (_conn._started)
{
@@ -100,4 +101,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
throw new FailoverException("failing to reconnect during failover, operation not supported.");
}
+
+ public void closeConneciton(long timeout) throws JMSException, AMQException
+ {
+ try
+ {
+ _qpidConnection.close();
+ }
+ catch (QpidException e)
+ {
+ throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
+ }
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
index 69dfed2dd9..3ad32d83fb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
@@ -51,6 +51,13 @@ public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class);
private AMQConnection _conn;
+
+ public void closeConneciton(long timeout) throws JMSException, AMQException
+ {
+ _conn.getProtocolHandler().closeConnection(timeout);
+
+ }
+
public AMQConnectionDelegate_0_8(AMQConnection conn)
{
_conn = conn;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index f3fa79eb51..2fd2b04c99 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -22,6 +22,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
@@ -48,20 +49,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
public void declareDestination(AMQDestination destination)
{
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null,
- // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
- _protocolHandler.writeFrame(declare);
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
+ destination.getExchangeClass().toString(),
+ null,
+ null
+ );
}
//--- Overwritten methods
@@ -105,7 +97,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
// set the application properties
qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString());
- qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString());
+ AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
+ if( correlationID != null )
+ {
+ qpidityMessage.getMessageProperties().setCorrelationId(correlationID.toString());
+ }
String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index f5603b1695..1f70663df8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -126,7 +126,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
props.setContentType(mprop.getContentType());
props.setCorrelationId(mprop.getCorrelationId());
- props.setEncoding(mprop.getContentEncoding());
+ String encoding = mprop.getContentEncoding();
+ if (!encoding.equals(""))
+ {
+ props.setEncoding(encoding);
+ }
props.setExpiration(devprop.getExpiration());
// todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
props.setMessageId(mprop.getMessageId());
diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java
index 758088dd68..605e9ee154 100644
--- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java
@@ -356,7 +356,7 @@ public class URLParser_0_10
try
{
char next = _url[_index];
- while (next != ADDRESS_SEPERATOR_CHAR)
+ while (next != ADDRESS_SEPERATOR_CHAR && next != END_OF_URL_MARKER )
{
b.append(next);
next = _url[++_index];
@@ -378,6 +378,10 @@ public class URLParser_0_10
{
int port = Integer.parseInt(portStr);
_currentBroker.setPort(port);
+ if( _url[_index] == END_OF_URL_MARKER )
+ {
+ _endOfURL = true;
+ }
return URLParserState.ADDRESS_END;
}
catch (NumberFormatException e)