summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-04-09 11:45:48 +0000
committerRobert Greig <rgreig@apache.org>2007-04-09 11:45:48 +0000
commit35cbdf5f3cd0dbcd2e8a0a81741db1082daec3a2 (patch)
treef968fd07d83eb96992b4d98a9fd2b3a57a8079b4 /java/client/src
parent362695aab16f1669ca5cc9b4b2094a2a47911675 (diff)
downloadqpid-python-35cbdf5f3cd0dbcd2e8a0a81741db1082daec3a2.tar.gz
Merged revisions 526714 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r526714 | rgreig | 2007-04-09 12:25:32 +0100 (Mon, 09 Apr 2007) | 1 line Purged logging from exception constructors. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@526720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java46
4 files changed, 40 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 184bc44912..82f9a036d2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -769,7 +769,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- amqe = new AMQException(_logger, "Closing session forcibly", e);
+ amqe = new AMQException("Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index f6ddfdc715..28c0c4f3c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -170,7 +170,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
catch (UnsupportedEncodingException e)
{
- throw new AMQException(_log, "Unable to decode data: " + e, e);
+ throw new AMQException("Unable to decode data: " + e, e);
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
index 21526ac6d2..54a8283763 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,25 +20,23 @@
*/
package org.apache.qpid.client.message;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
public class UnexpectedBodyReceivedException extends AMQException
{
-
- public UnexpectedBodyReceivedException(Logger logger, String msg, Throwable t)
+ public UnexpectedBodyReceivedException(String msg, Throwable t)
{
- super(logger, msg, t);
+ super(msg, t);
}
- public UnexpectedBodyReceivedException(Logger logger, String msg)
+ public UnexpectedBodyReceivedException(String msg)
{
- super(logger, msg);
+ super(msg);
}
- public UnexpectedBodyReceivedException(Logger logger, AMQConstant errorCode, String msg)
+ public UnexpectedBodyReceivedException(AMQConstant errorCode, String msg)
{
- super(logger, errorCode, msg);
+ super(errorCode, msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index e875b4dca8..35aa69bd82 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,11 +27,14 @@ import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
import org.apache.commons.lang.StringUtils;
+
import org.apache.log4j.Logger;
+
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -45,10 +48,10 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -98,8 +101,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private byte _protocolMinorVersion;
private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
-
+ private VersionSpecificRegistry _registry =
+ MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
/**
* No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
@@ -118,19 +121,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- //fixme - real value needed
+ // fixme - real value needed
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = new AMQStateManager(this);
}
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
+ AMQStateManager stateManager)
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
_minaProtocolSession.setAttachment(this);
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- //fixme - real value needed
+ // fixme - real value needed
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
@@ -242,18 +246,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
- throws AMQException
+ public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
}
+
if (msg.getContentHeader() != null)
{
- throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+ throw new AMQException(
+ "Error: received duplicate content header or did not receive correct number of content body frames");
}
+
msg.setContentHeader(contentHeader);
if (contentHeader.bodySize == 0)
{
@@ -268,11 +274,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
}
+
if (msg.getContentHeader() == null)
{
_channelId2UnprocessedMsgMap.remove(channelId);
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
+
try
{
msg.receiveBody(contentBody);
@@ -282,6 +290,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_channelId2UnprocessedMsgMap.remove(channelId);
throw e;
}
+
if (msg.isAllBodyDataReceived())
{
deliverMessageToAMQSession(channelId, msg);
@@ -317,7 +326,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
WriteFuture f = _minaProtocolSession.write(frame);
if (wait)
{
- //fixme -- time out?
+ // fixme -- time out?
f.join();
}
else
@@ -332,10 +341,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
}
+
if (session == null)
{
throw new IllegalArgumentException("Attempt to register a null session");
}
+
_logger.debug("Add session with channel id " + channelId);
_channelId2SessionMap.put(channelId, session);
}
@@ -346,6 +357,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
}
+
_logger.debug("Removing session with channelId " + channelId);
_channelId2SessionMap.remove(channelId);
}
@@ -388,12 +400,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
try
{
- session.closed(new AMQException(_logger, code, text));
+ session.closed(new AMQException(code, text));
}
catch (JMSException e)
{
throw new AMQException("JMSException received while closing session", e);
}
+
return true;
}
else
@@ -415,7 +428,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void closeProtocolSession(boolean waitLast)
{
_logger.debug("Waiting for last write to join.");
- if (waitLast && _lastWriteFuture != null)
+ if (waitLast && (_lastWriteFuture != null))
{
_lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -437,8 +450,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
id = _queueId++;
}
- //get rid of / and : and ; from address for spec conformance
+ // get rid of / and : and ; from address for spec conformance
String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+
return new AMQShortString("tmp_" + localAddress + "_" + id);
}