summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-02-20 15:52:04 +0000
committerGordon Sim <gsim@apache.org>2007-02-20 15:52:04 +0000
commitedfebbfec207f6b4c93933330c4792a8e8798eb7 (patch)
treea722b808201bc5d6b598ee54e78c1b1c7a072f6d /java/broker/src
parent08b43d86e9fd619cc3c07fbfd5c9d6befe978e8e (diff)
downloadqpid-python-edfebbfec207f6b4c93933330c4792a8e8798eb7.tar.gz
Some fixes to get more python tests passing.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java179
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java1
9 files changed, 119 insertions, 138 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index b2d4215bd0..3ab20e74bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -141,6 +141,11 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+ /**
+ * Used in creating unique references.
+ */
+ private byte _refCounter;
+
// XXX: clean up arguments
public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
{
@@ -218,7 +223,7 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
- public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
+ public void addMessageTransfer(MessageTransferBody transferBody, long requestId, AMQProtocolSession publisher) throws AMQException
{
Content body = transferBody.getBody();
AMQMessage message;
@@ -226,14 +231,20 @@ public class AMQChannel
case INLINE_T:
message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext);
message.setPublisher(publisher);
+ message.setRequestId(requestId);
routeCurrentMessage(message);
- message.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
break;
case REF_T:
- AMQReference ref = getReference(body.getContentAsByteArray());
- message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
- message.setPublisher(publisher);
- ref.addRefTransferBody(message);
+ try {
+ AMQReference ref = getReference(body.getContentAsByteArray());
+ message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
+ message.setPublisher(publisher);
+ message.setRequestId(requestId);
+ ref.addRefTransferBody(message);
+ } catch (IllegalArgumentException e) {
+ throw transferBody.getConnectionException(503, "Reference is not open");
+ }
+
break;
}
}
@@ -277,24 +288,35 @@ public class AMQChannel
return ref;
}
- public void addMessageOpen(MessageOpenBody open)
+ public void addMessageOpen(MessageOpenBody open) throws AMQException
{
- createReference(open.reference);
+ try {
+ createReference(open.reference);
+ } catch (IllegalArgumentException e) {
+ throw open.getConnectionException(503, "Reference is already open");
+ }
}
- public void addMessageAppend(MessageAppendBody append)
+ public void addMessageAppend(MessageAppendBody append) throws AMQException
{
- AMQReference ref = getReference(append.reference);
- ref.appendContent(ByteBuffer.wrap(append.bytes));
+ try {
+ AMQReference ref = getReference(append.reference);
+ ref.appendContent(ByteBuffer.wrap(append.bytes));
+ } catch (IllegalArgumentException e) {
+ throw append.getConnectionException(503, "Reference is not open");
+ }
}
public void addMessageClose(MessageCloseBody close) throws AMQException
{
- AMQReference ref = removeReference(close.reference);
- for (AMQMessage msg : ref.getMessageList())
- {
- routeCurrentMessage(msg);
- msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+ try {
+ AMQReference ref = removeReference(close.reference);
+ for (AMQMessage msg : ref.getMessageList())
+ {
+ routeCurrentMessage(msg);
+ }
+ } catch (IllegalArgumentException e) {
+ throw close.getConnectionException(503, "Reference is not open");
}
}
@@ -308,38 +330,18 @@ public class AMQChannel
{
_returnMessages.add(e);
}
+ msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+
+ MessageOkBody ok = MessageOkBody.createMethodBody(
+ _session.getProtocolMajorVersion(),
+ _session.getProtocolMinorVersion()
+ );
+ _session.writeResponse(_channelId, msg.getRequestId(), ok);
}
public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
{
- // Do we need to refactor the content for a different frame size?
- long maxFrameSize = _session.getFrameMax();
- Iterable<ByteBuffer> contentItr = msg.getContents();
- if (msg.getSize() > maxFrameSize)
- {
- Iterator<ByteBuffer> cItr = contentItr.iterator();
- if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size
- {
- // TODO - Refactor the chunks for smaller outbound frame size
- throw new Error("XXX TODO - need to refactor content chunks here");
- // deliverRef(msg, destination, deliveryTag);
- }
- else
- {
- // Use ref content as is - no need to refactor
- deliverRef(msg, destination, deliveryTag);
- }
- }
- else
- {
- // Concatenate - all incoming chunks will fit into single outbound frame
- deliverInline(msg, destination, deliveryTag);
- }
- }
-
- public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag)
- {
- deliverInline(msg, destination, new AMQMethodListener()
+ AMQMethodListener listener = new AMQMethodListener()
{
public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
@@ -361,9 +363,20 @@ public class AMQChannel
}
}
public void error(Exception e) {}
- });
+ };
+ long maxFrameSize = _session.getFrameMax();
+ if (msg.getFullSize() > maxFrameSize)
+ {
+ //need to send as reference
+ deliverRef(msg, destination, listener);
+ }
+ else
+ {
+ //message will fit inline
+ deliverInline(msg, destination, listener);
+ }
}
-
+
public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
MessageTransferBody mtb = msg.getTransferBody().copy();
@@ -378,64 +391,37 @@ public class AMQChannel
mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
_session.writeRequest(_channelId, mtb, listener);
}
-
- public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
- {
- final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
- deliverRef(refId, msg, destination, new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled())
- {
- _log.debug(method + " received on channel " + _channelId);
- }
- // XXX: multiple?
- if (method instanceof MessageOkBody)
- {
- acknowledgeMessage(deliveryTag, false);
- return true;
- }
- else
- {
- // TODO: implement reject
- return false;
- }
- }
- public void error(Exception e) {}
- });
+
+ private synchronized byte[] nextRefId() {
+ return new byte[]{_refCounter++};
}
- public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+ public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
- AMQMethodBody openBody = MessageOpenBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, openBody, listener);
+ AMQMethodListener dummy = new AMQMethodListener()
+ {
+ public boolean methodReceived(AMQMethodEvent evt){ return true; }
+ public void error(Exception e) {}
+ };
+ byte major = _session.getProtocolMajorVersion();
+ byte minor = _session.getProtocolMinorVersion();
+ byte[] refId = nextRefId();
+ _session.writeRequest(_channelId, MessageOpenBody.createMethodBody(major, minor, refId), dummy);
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
- mtb.redelivered = msg.isRedelivered();
mtb.body = new Content(Content.TypeEnum.REF_T, refId);
_session.writeRequest(_channelId, mtb, listener);
- for (ByteBuffer bb : msg.getContents())
- {
- ByteBuffer dup = bb.duplicate();
- byte[] ba = new byte[dup.limit()];
- dup.get(ba);
- AMQMethodBody appendBody = MessageAppendBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- ba,
- refId);
- _session.writeRequest(_channelId, appendBody, listener);
+ for (ByteBuffer buffer : msg.getContents())
+ {
+ //TODO: try and avoid all this copying!
+ while (buffer.remaining() > 0)
+ {
+ byte[] data = new byte[Math.min((int) _session.getFrameMax(), buffer.remaining())];
+ buffer.get(data);
+ _session.writeRequest(_channelId, MessageAppendBody.createMethodBody(major, minor, data, refId), dummy);
+ }
}
- AMQMethodBody closeBody = MessageCloseBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, closeBody, listener);
+ _session.writeRequest(_channelId, MessageCloseBody.createMethodBody(major, minor, refId), dummy);
}
public RequestManager getRequestManager()
@@ -554,6 +540,7 @@ public class AMQChannel
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
+ unacked.message.setRedelivered(true);
if (unacked.queue != null)
{
_txnContext.deliver(unacked.message, unacked.queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index a29552de03..2cf69bf27d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -66,7 +66,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
}
else
{
- virtualHostName = String.valueOf(body.virtualHost);
+ virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
}
VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
index cab8a7d0ad..987420f88f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageQosBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -46,7 +47,9 @@ public class MessageQosHandler implements StateAwareMethodListener<MessageQosBod
public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.setPrefetchCount(evt.getMethod().prefetchCount);
+ channel.setPrefetchSize(evt.getMethod().prefetchSize);
// Be aware of possible changes to parameter order as versions change.
session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody(
session.getProtocolMajorVersion(), // AMQP major version
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
index 782909570d..e87b53234e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
@@ -78,10 +78,10 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
AMQChannel channel = session.getChannel(evt.getChannelId());
- channel.addMessageTransfer(body, session);
- session.writeResponse(evt, MessageOkBody.createMethodBody(
- session.getProtocolMajorVersion(), // AMQP major version
- session.getProtocolMinorVersion())); // AMQP minor version
+ channel.addMessageTransfer(body, evt.getRequestId(), session);
+ //session.writeResponse(evt, MessageOkBody.createMethodBody(
+ // session.getProtocolMajorVersion(), // AMQP major version
+ // session.getProtocolMinorVersion())); // AMQP minor version
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 58461e8f1b..55edea4bbb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -610,8 +610,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
task.doTask(this);
}
}
-// gsim-python
-// _minaProtocolSession.close();
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 4571da502e..7b367ad50c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -168,6 +168,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
// gsim-python
//session.closeSessionRequest(200, new AMQShortString(throwable.getMessage()));
session.closeSession();
+ protocolSession.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d433dd6bea..35b779da25 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,8 +80,6 @@ public class AMQMessage
}
};
- private boolean _redelivered;
-
private final Long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -119,6 +117,7 @@ public class AMQMessage
private boolean _deliveredToConsumer;
private AtomicBoolean _taken = new AtomicBoolean(false);
+ private long _requestId;//the request id of the transfer that this message represents
public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, TransactionalContext txnContext)
{
@@ -160,6 +159,16 @@ public class AMQMessage
public long getSize()
{
+ //based on existing usage, this should return the size of the
+ //data and inline data will already be included in the count
+ //by getBodySize()
+ return getBodySize();
+ }
+
+ public long getFullSize()
+ {
+ //this is used in determining whether a message can be inlined
+ //or not and therefore must include the header size also
return getHeaderSize() + getBodySize();
}
@@ -300,11 +309,11 @@ public class AMQMessage
_transferBody.priority = priority;
}
- // TODO - how does this relate to the _redelivered flag in this class? See other isRedelivered() method below.
-// public boolean isRedelivered()
-// {
-// return _transferBody.getRedelivered();
-// }
+
+ public boolean isRedelivered()
+ {
+ return _transferBody.getRedelivered();
+ }
public AMQShortString getReplyTo()
{
@@ -406,12 +415,6 @@ public class AMQMessage
//return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
NoConsumersException getNoConsumersException(String queue)
{
return new NoConsumersException(queue, this);
@@ -420,7 +423,6 @@ public class AMQMessage
public void setRedelivered(boolean redelivered)
{
_transferBody.redelivered = redelivered;
- _redelivered = redelivered;
}
public long getMessageId()
@@ -638,4 +640,14 @@ public class AMQMessage
throw new Error("XXX");
}
+ public void setRequestId(long requestId)
+ {
+ _requestId = requestId;
+ }
+
+ public long getRequestId()
+ {
+ return _requestId;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a7c4b5ca19..2f128a3e3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -371,25 +371,6 @@ public class AMQQueue implements Managable, Comparable
setExclusive(true);
}
- if(incrementSubscriberCount() > 1)
- {
- if(isExclusive())
- {
- decrementSubscriberCount();
- throw EXISTING_EXCLUSIVE;
- }
- else if(exclusive)
- {
- decrementSubscriberCount();
- throw EXISTING_SUBSCRIPTION;
- }
-
- }
- else if(exclusive)
- {
- setExclusive(true);
- }
-
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 685015226c..fcbcc2f09a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -255,7 +255,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
msg.dequeue(storeContext, _queue);
- count++;
_totalMessageSize.set(0L);
count++;
msg = poll();