summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-15 20:09:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-15 20:09:37 +0000
commit91782b4b801528ca5fb4a3f9e3c2879d1f02c3a1 (patch)
tree5327b5868831224b95cbf84cad54397e5dea6c7a /qpid/java/broker/src/main
parent43d9b8b9928c56ee200ae1a3323796f393dec0f7 (diff)
downloadqpid-python-91782b4b801528ca5fb4a3f9e3c2879d1f02c3a1.tar.gz
QPID-4659 : [Java Broker] move amqp 0-10 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java271
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java226
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java176
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java93
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java30
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java88
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java138
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java67
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java281
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java171
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java136
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java100
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java200
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java560
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java350
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java1000
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java1582
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java952
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java207
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java9
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter21
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType1
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator1
26 files changed, 15 insertions, 6758 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
deleted file mode 100644
index c14896079f..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ /dev/null
@@ -1,271 +0,0 @@
-package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
-
-public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage>
-{
- private static final int BASIC_CLASS_ID = 60;
-
- public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage,
- VirtualHost vhost)
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- Header header = messageTransferMessage.getHeader();
- DeliveryProperties deliveryProps = header.getDeliveryProperties();
- MessageProperties messageProps = header.getMessageProperties();
-
- if(deliveryProps != null)
- {
- if(deliveryProps.hasDeliveryMode())
- {
- props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
- ? BasicContentHeaderProperties.PERSISTENT
- : BasicContentHeaderProperties.NON_PERSISTENT));
- }
- if(deliveryProps.hasExpiration())
- {
- props.setExpiration(deliveryProps.getExpiration());
- }
- if(deliveryProps.hasPriority())
- {
- props.setPriority((byte) deliveryProps.getPriority().getValue());
- }
- if(deliveryProps.hasTimestamp())
- {
- props.setTimestamp(deliveryProps.getTimestamp());
- }
- }
- if(messageProps != null)
- {
- if(messageProps.hasAppId())
- {
- props.setAppId(new AMQShortString(messageProps.getAppId()));
- }
- if(messageProps.hasContentType())
- {
- props.setContentType(messageProps.getContentType());
- }
- if(messageProps.hasCorrelationId())
- {
- props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
- }
- if(messageProps.hasContentEncoding())
- {
- props.setEncoding(messageProps.getContentEncoding());
- }
- if(messageProps.hasMessageId())
- {
- props.setMessageId("ID:" + messageProps.getMessageId().toString());
- }
- if(messageProps.hasReplyTo())
- {
- ReplyTo replyTo = messageProps.getReplyTo();
- String exchangeName = replyTo.getExchange();
- String routingKey = replyTo.getRoutingKey();
- if(exchangeName == null)
- {
- exchangeName = "";
- }
-
- Exchange exchange = vhost.getExchange(exchangeName);
- String exchangeClass = exchange == null
- ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()
- : exchange.getType().getName().asString();
- props.setReplyTo(exchangeClass + "://" + exchangeName + "//?routingkey='" + (routingKey == null
- ? ""
- : routingKey + "'"));
-
- }
- if(messageProps.hasUserId())
- {
- props.setUserId(new AMQShortString(messageProps.getUserId()));
- }
-
- if(messageProps.hasApplicationHeaders())
- {
- Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
- if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
- {
- props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
- }
-
- FieldTable ft = new FieldTable();
- for(Map.Entry<String, Object> entry : appHeaders.entrySet())
- {
- try
- {
- ft.put(new AMQShortString(entry.getKey()), entry.getValue());
- }
- catch (AMQPInvalidClassException e)
- {
- // TODO
- // log here, but ignore - just can;t convert
- }
- }
- props.setHeaders(ft);
-
- }
- }
-
- return props;
- }
-
- @Override
- public Class<MessageTransferMessage> getInputClass()
- {
- return MessageTransferMessage.class;
- }
-
- @Override
- public Class<AMQMessage> getOutputClass()
- {
- return AMQMessage.class;
- }
-
- @Override
- public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost)
- {
- return new AMQMessage(convertToStoredMessage(message, vhost));
- }
-
- private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message,
- VirtualHost vhost)
- {
- final MessageMetaData metaData = convertMetaData(message, vhost);
- return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>()
- {
- @Override
- public MessageMetaData getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return message.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return message.getContent(dst, offsetInMessage);
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return message.getContent(offsetInMessage, size);
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost)
- {
- return new MessageMetaData(convertPublishBody(message),
- convertContentHeaderBody(message, vhost),
- 1,
- message.getArrivalTime());
- }
-
- private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost)
- {
- BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost);
- ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
- chb.setBodySize(message.getSize());
- return chb;
- }
-
- private MessagePublishInfo convertPublishBody(MessageTransferMessage message)
- {
- DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
- final AMQShortString exchangeName = (delvProps == null || delvProps.getExchange() == null)
- ? null
- : new AMQShortString(delvProps.getExchange());
- final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null)
- ? null
- : new AMQShortString(delvProps.getRoutingKey());
- final boolean immediate = delvProps != null && delvProps.getImmediate();
- final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
-
- return new MessagePublishInfo()
- {
- @Override
- public AMQShortString getExchange()
- {
- return exchangeName;
- }
-
- @Override
- public void setExchange(AMQShortString exchange)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isImmediate()
- {
- return immediate;
- }
-
- @Override
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
- }
-
- @Override
- public String getType()
- {
- return "v0-10 to v0-8";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
deleted file mode 100644
index e1e8fbd9d3..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
-
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
-import org.apache.qpid.url.AMQBindingURL;
-
-public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessage, MessageTransferMessage>
-{
- @Override
- public Class<AMQMessage> getInputClass()
- {
- return AMQMessage.class;
- }
-
- @Override
- public Class<MessageTransferMessage> getOutputClass()
- {
- return MessageTransferMessage.class;
- }
-
- @Override
- public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost)
- {
- return new MessageTransferMessage(convertToStoredMessage(message_0_8), null);
- }
-
- private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8)
- {
- final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8);
- return new StoredMessage<MessageMetaData_0_10>()
- {
- @Override
- public MessageMetaData_0_10 getMetaData()
- {
- return messageMetaData_0_10;
- }
-
- @Override
- public long getMessageNumber()
- {
- return message_0_8.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return message_0_8.getContent(dst, offsetInMessage);
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return message_0_8.getContent(offsetInMessage, size);
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8)
- {
- DeliveryProperties deliveryProps = new DeliveryProperties();
- MessageProperties messageProps = new MessageProperties();
-
- int size = (int) message_0_8.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- message_0_8.getContent(body, 0);
- body.flip();
-
- BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
-
- final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
- if(exchange != null)
- {
- deliveryProps.setExchange(exchange.toString());
- }
-
- deliveryProps.setExpiration(message_0_8.getExpiration());
- deliveryProps.setImmediate(message_0_8.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
- deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
- deliveryProps.setTimestamp(properties.getTimestamp());
-
- messageProps.setContentEncoding(properties.getEncodingAsString());
- messageProps.setContentLength(size);
- if(properties.getAppId() != null)
- {
- messageProps.setAppId(properties.getAppId().getBytes());
- }
- messageProps.setContentType(properties.getContentTypeAsString());
- if(properties.getCorrelationId() != null)
- {
- messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
- }
-
- if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
- {
- String origReplyToString = properties.getReplyTo().asString();
- ReplyTo replyTo = new ReplyTo();
- // if the string looks like a binding URL, then attempt to parse it...
- try
- {
- AMQBindingURL burl = new AMQBindingURL(origReplyToString);
- AMQShortString routingKey = burl.getRoutingKey();
- if(routingKey != null)
- {
- replyTo.setRoutingKey(routingKey.asString());
- }
-
- AMQShortString exchangeName = burl.getExchangeName();
- if(exchangeName != null)
- {
- replyTo.setExchange(exchangeName.asString());
- }
- }
- catch (URISyntaxException e)
- {
- replyTo.setRoutingKey(origReplyToString);
- }
- messageProps.setReplyTo(replyTo);
-
- }
-
- if(properties.getMessageId() != null)
- {
- try
- {
- String messageIdAsString = properties.getMessageIdAsString();
- if(messageIdAsString.startsWith("ID:"))
- {
- messageIdAsString = messageIdAsString.substring(3);
- }
- UUID uuid = UUID.fromString(messageIdAsString);
- messageProps.setMessageId(uuid);
- }
- catch(IllegalArgumentException e)
- {
- // ignore - can't parse
- }
- }
-
-
-
- if(properties.getUserId() != null)
- {
- messageProps.setUserId(properties.getUserId().getBytes());
- }
-
- FieldTable fieldTable = properties.getHeaders();
-
- Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
-
- if(properties.getType() != null)
- {
- appHeaders.put("x-jms-type", properties.getTypeAsString());
- }
-
-
- messageProps.setApplicationHeaders(appHeaders);
-
- Header header = new Header(deliveryProps, messageProps, null);
-
-
- return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime());
- }
-
- @Override
- public String getType()
- {
- return "v0-8 to v0-10";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
deleted file mode 100644
index cee1a04b17..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
-{
- private volatile long _bytesCredit;
- private volatile long _messageCredit;
-
- public CreditCreditManager(long bytesCredit, long messageCredit)
- {
- _bytesCredit = bytesCredit;
- _messageCredit = messageCredit;
- setSuspended(!hasCredit());
-
- }
-
-
- public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
- {
- _bytesCredit = bytesCredit;
- _messageCredit = messageCredit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCredit == -1L
- ? Long.MAX_VALUE
- : _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit == -1L
- ? Long.MAX_VALUE
- : _bytesCredit;
- }
-
- public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
- {
- }
-
-
- public synchronized void addCredit(final long messageCredit, final long bytesCredit)
- {
- boolean notifyIncrease = true;
- if(_messageCredit >= 0L && messageCredit > 0L)
- {
- notifyIncrease = _messageCredit != 0L;
- _messageCredit += messageCredit;
- }
-
-
-
- if(_bytesCredit >= 0L && bytesCredit > 0L)
- {
- notifyIncrease = notifyIncrease && bytesCredit>0;
- _bytesCredit += bytesCredit;
-
-
-
- if(notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
- }
-
-
-
- setSuspended(!hasCredit());
-
- }
-
- public void clearCredit()
- {
- _bytesCredit = 0l;
- _messageCredit = 0l;
- setSuspended(true);
- }
-
-
- public synchronized boolean hasCredit()
- {
- // Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L);
- }
-
- public synchronized boolean useCreditForMessage(long msgSize)
- {
- if(_messageCredit >= 0L)
- {
- if(_messageCredit > 0)
- {
- if(_bytesCredit < 0L)
- {
- _messageCredit--;
-
- return true;
- }
- else if(msgSize <= _bytesCredit)
- {
- _messageCredit--;
- _bytesCredit -= msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
- }
- else
- {
- setSuspended(true);
- return false;
- }
- }
- else if(_bytesCredit >= 0L)
- {
- if(msgSize <= _bytesCredit)
- {
- _bytesCredit -= msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
-
- }
- else
- {
- return true;
- }
-
- }
-
- public synchronized void stop()
- {
- if(_bytesCredit > 0)
- {
- _bytesCredit = 0;
- }
- if(_messageCredit > 0)
- {
- _messageCredit = 0;
- }
-
- }
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index 4b38b8a1a3..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.queue.QueueEntry;
-
-
-class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
- private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
-
-
- private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
-
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
- {
- _entry = entry;
- _sub = subscription_0_10;
- }
-
- public void onAccept()
- {
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
- {
- subscription.getSessionModel().acknowledge(subscription, _entry);
- }
- else
- {
- _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
- }
-
- }
-
- public void onRelease(boolean setRedelivered)
- {
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
- {
- subscription.release(_entry, setRedelivered);
- }
- else
- {
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
- }
- }
-
- public void onReject()
- {
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
- {
- subscription.reject(_entry);
- }
- else
- {
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
- }
-
- }
-
- public boolean acquire()
- {
- return _entry.acquire(getSubscription());
- }
-
-
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
deleted file mode 100755
index 7f092814da..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.server.flow.FlowCreditManager;
-
-public interface FlowCreditManager_0_10 extends FlowCreditManager
-{
- public void addCredit(long count, long bytes);
-
- void clearCredit();
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index ce0155b789..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.queue.QueueEntry;
-
-class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
- private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
-
-
- private final QueueEntry _entry;
- private Subscription_0_10 _sub;
-
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
- {
- _entry = entry;
- _sub = subscription_0_10;
- }
-
- public void onAccept()
- {
- _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
- }
-
- public void onRelease(boolean setRedelivered)
- {
- if(_entry.isAcquiredBy(_sub))
- {
- getSubscription().release(_entry, setRedelivered);
- }
- else
- {
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
- }
- }
-
- public void onReject()
- {
- if(_entry.isAcquiredBy(_sub))
- {
- getSubscription().reject(_entry);
- }
- else
- {
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
- }
-
- }
-
- public boolean acquire()
- {
- boolean acquired = _entry.acquire(getSubscription());
- if(acquired)
- {
- getSubscription().recordUnacknowledged(_entry);
- }
- return acquired;
-
- }
-
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
deleted file mode 100755
index f5f2a8d43f..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.transport.Method;
-
-public class MessageAcceptCompletionListener implements Method.CompletionListener
-{
- private final Subscription_0_10 _sub;
- private final QueueEntry _entry;
- private final ServerSession _session;
- private boolean _restoreCredit;
-
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
- {
- super();
- _sub = sub;
- _entry = entry;
- _session = session;
- _restoreCredit = restoreCredit;
- }
-
- public void onComplete(Method method)
- {
- if(_restoreCredit)
- {
- _sub.restoreCredit(_entry);
- }
- if(_entry.isAcquiredBy(_sub))
- {
- _session.acknowledge(_sub, _entry);
- }
-
- _session.removeDispositionListener(method);
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
deleted file mode 100644
index c6ae0c6e47..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.nio.ByteBuffer;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-
-public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage>
-{
- @Override
- public Class<ServerMessage> getInputClass()
- {
- return ServerMessage.class;
- }
-
- @Override
- public Class<MessageTransferMessage> getOutputClass()
- {
- return MessageTransferMessage.class;
- }
-
- @Override
- public MessageTransferMessage convert(ServerMessage serverMsg, VirtualHost vhost)
- {
- return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
- }
-
- private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final ServerMessage serverMsg)
- {
- final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg);
-
- return new StoredMessage<MessageMetaData_0_10>()
- {
- @Override
- public MessageMetaData_0_10 getMetaData()
- {
- return messageMetaData_0_10;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMsg.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return serverMsg.getContent(dst, offsetInMessage);
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return serverMsg.getContent(offsetInMessage, size);
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)
- {
- DeliveryProperties deliveryProps = new DeliveryProperties();
- MessageProperties messageProps = new MessageProperties();
-
- int size = (int) serverMsg.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- serverMsg.getContent(body, 0);
- body.flip();
-
-
- deliveryProps.setExpiration(serverMsg.getExpiration());
- deliveryProps.setImmediate(serverMsg.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
- deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
-
- messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
- messageProps.setContentLength(size);
- messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
- if(serverMsg.getMessageHeader().getCorrelationId() != null)
- {
- messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
- }
-
- Header header = new Header(deliveryProps, messageProps, null);
- return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
- }
-
- @Override
- public String getType()
- {
- return "Unknown to v0-10";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
deleted file mode 100644
index 90fb443f5b..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.nio.ByteBuffer;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.store.StoredMessage;
-
-public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10>
-{
-
- public static final int TYPE = 1;
-
- @Override
- public int ordinal()
- {
- return TYPE;
- }
-
- @Override
- public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
- {
- return MessageMetaData_0_10.FACTORY.createMetaData(buf);
- }
-
- @Override
- public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg)
- {
- return new MessageTransferMessage(msg, null);
- }
-
- public int hashCode()
- {
- return ordinal();
- }
-
- public boolean equals(Object o)
- {
- return o != null && o.getClass() == getClass();
- }
-
- @Override
- public String getType()
- {
- return AmqpProtocolVersion.v0_10.toString();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
deleted file mode 100755
index ee2a40a5b2..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.BBEncoder;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
-{
- private Header _header;
- private DeliveryProperties _deliveryProps;
- private MessageProperties _messageProps;
- private MessageTransferHeader _messageHeader;
- private long _arrivalTime;
- private int _bodySize;
-
- private static final int ENCODER_SIZE = 1 << 10;
-
- public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
-
- private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
-
- private volatile ByteBuffer _encoded;
- private Object _connectionReference;
-
-
- public MessageMetaData_0_10(MessageTransfer xfr)
- {
- this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
- }
-
- public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
- {
- _header = header;
- if(_header != null)
- {
- _deliveryProps = _header.getDeliveryProperties();
- _messageProps = _header.getMessageProperties();
- }
- else
- {
- _deliveryProps = null;
- _messageProps = null;
- }
- _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
- _arrivalTime = arrivalTime;
- _bodySize = bodySize;
-
- }
-
-
-
- public MessageMetaDataType getType()
- {
- return TYPE;
- }
-
- public int getStorableSize()
- {
- ByteBuffer buf = _encoded;
-
- if(buf == null)
- {
- buf = encodeAsBuffer();
- _encoded = buf;
- }
-
- //TODO -- need to add stuff
- return buf.limit();
- }
-
- private ByteBuffer encodeAsBuffer()
- {
- BBEncoder encoder = new BBEncoder(ENCODER_SIZE);
-
- encoder.writeInt64(_arrivalTime);
- encoder.writeInt32(_bodySize);
- int headersLength = 0;
- if(_header.getDeliveryProperties() != null)
- {
- headersLength++;
- }
- if(_header.getMessageProperties() != null)
- {
- headersLength++;
- }
- if(_header.getNonStandardProperties() != null)
- {
- headersLength += _header.getNonStandardProperties().size();
- }
-
- encoder.writeInt32(headersLength);
-
- if(_header.getDeliveryProperties() != null)
- {
- encoder.writeStruct32(_header.getDeliveryProperties());
- }
- if(_header.getMessageProperties() != null)
- {
- encoder.writeStruct32(_header.getMessageProperties());
- }
- if(_header.getNonStandardProperties() != null)
- {
-
- for(Struct header : _header.getNonStandardProperties())
- {
- encoder.writeStruct32(header);
- }
-
- }
- ByteBuffer buf = encoder.buffer();
- return buf;
- }
-
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
- {
- ByteBuffer buf = _encoded;
-
- if(buf == null)
- {
- buf = encodeAsBuffer();
- _encoded = buf;
- }
-
- buf = buf.duplicate();
-
- buf.position(offsetInMetaData);
-
- if(dest.remaining() < buf.limit())
- {
- buf.limit(dest.remaining());
- }
- dest.put(buf);
- return buf.limit();
- }
-
- public int getContentSize()
- {
- return _bodySize;
- }
-
- public boolean isPersistent()
- {
- return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
- }
-
- public String getRoutingKey()
- {
- return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
- }
-
- public AMQShortString getRoutingKeyShortString()
- {
- return AMQShortString.valueOf(getRoutingKey());
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return _messageHeader;
- }
-
- public long getSize()
- {
-
- return _bodySize;
- }
-
- public boolean isImmediate()
- {
- return _deliveryProps != null && _deliveryProps.getImmediate();
- }
-
- public long getExpiration()
- {
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
- }
-
- public boolean isRedelivered()
- {
- // The *Message* is never redelivered, only queue entries are...
- return false;
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
- public Header getHeader()
- {
- return _header;
- }
-
- public void setConnectionReference(Object connectionReference)
- {
- _connectionReference = connectionReference;
- }
-
- public Object getConnectionReference()
- {
- return _connectionReference;
- }
-
- private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
- {
- public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(buf);
-
- long arrivalTime = decoder.readInt64();
- int bodySize = decoder.readInt32();
- int headerCount = decoder.readInt32();
-
- DeliveryProperties deliveryProperties = null;
- MessageProperties messageProperties = null;
- List<Struct> otherProps = null;
-
- for(int i = 0 ; i < headerCount; i++)
- {
- Struct struct = decoder.readStruct32();
- if(struct instanceof DeliveryProperties && deliveryProperties == null)
- {
- deliveryProperties = (DeliveryProperties) struct;
- }
- else if(struct instanceof MessageProperties && messageProperties == null)
- {
- messageProperties = (MessageProperties) struct;
- }
- else
- {
- if(otherProps == null)
- {
- otherProps = new ArrayList<Struct>();
-
- }
- otherProps.add(struct);
- }
- }
- Header header = new Header(deliveryProperties,messageProperties,otherProps);
-
- return new MessageMetaData_0_10(header, bodySize, arrivalTime);
-
- }
- }
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
deleted file mode 100644
index 1b506d9bf8..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.util.*;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-
-class MessageTransferHeader implements AMQMessageHeader
-{
-
-
- public static final String JMS_TYPE = "x-jms-type";
-
- private final DeliveryProperties _deliveryProps;
- private final MessageProperties _messageProps;
-
- public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
- {
- _deliveryProps = deliveryProps;
- _messageProps = messageProps;
- }
-
- public String getCorrelationId()
- {
- if (_messageProps != null && _messageProps.getCorrelationId() != null)
- {
- return new String(_messageProps.getCorrelationId());
- }
- else
- {
- return null;
- }
- }
-
- public long getExpiration()
- {
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
- }
-
- public String getUserId()
- {
- byte[] userIdBytes = _messageProps == null ? null : _messageProps.getUserId();
- return userIdBytes == null ? null : new String(userIdBytes);
- }
-
- public String getAppId()
- {
- byte[] appIdBytes = _messageProps == null ? null : _messageProps.getAppId();
- return appIdBytes == null ? null : new String(appIdBytes);
- }
-
- public String getMessageId()
- {
- UUID id = _messageProps == null ? null : _messageProps.getMessageId();
-
- return id == null ? null : String.valueOf(id);
- }
-
- public String getMimeType()
- {
- return _messageProps == null ? null : _messageProps.getContentType();
- }
-
- public String getEncoding()
- {
- return _messageProps == null ? null : _messageProps.getContentEncoding();
- }
-
- public byte getPriority()
- {
- MessageDeliveryPriority priority = _deliveryProps == null || !_deliveryProps.hasPriority()
- ? MessageDeliveryPriority.MEDIUM
- : _deliveryProps.getPriority();
- return (byte) priority.getValue();
- }
-
- public long getTimestamp()
- {
- return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp();
- }
-
- public String getType()
- {
- Object type = getHeader(JMS_TYPE);
- return type instanceof String ? (String) type : null;
- }
-
- public String getReplyTo()
- {
- if (_messageProps != null && _messageProps.getReplyTo() != null)
- {
- return _messageProps.getReplyTo().toString();
- }
- else
- {
- return null;
- }
- }
-
- public String getReplyToExchange()
- {
- if (_messageProps != null && _messageProps.getReplyTo() != null)
- {
- return _messageProps.getReplyTo().getExchange();
- }
- else
- {
- return null;
- }
- }
-
- public String getReplyToRoutingKey()
- {
- if (_messageProps != null && _messageProps.getReplyTo() != null)
- {
- return _messageProps.getReplyTo().getRoutingKey();
- }
- else
- {
- return null;
- }
- }
-
- public Object getHeader(String name)
- {
- Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
- return appHeaders == null ? null : appHeaders.get(name);
- }
-
- public boolean containsHeaders(Set<String> names)
- {
- Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
- return appHeaders != null && appHeaders.keySet().containsAll(names);
-
- }
-
- @Override
- public Collection<String> getHeaderNames()
- {
- Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
- return appHeaders != null ? Collections.unmodifiableCollection(appHeaders.keySet()) : Collections.EMPTY_SET ;
-
- }
-
- public boolean containsHeader(String name)
- {
- Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
- return appHeaders != null && appHeaders.containsKey(name);
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
deleted file mode 100644
index 4e8bfcb652..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.transport.Header;
-
-import java.nio.ByteBuffer;
-
-
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
-{
-
- private Object _connectionRef;
-
- public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
- {
- super(storeMessage);
- _connectionRef = connectionRef;
- }
-
- private MessageMetaData_0_10 getMetaData()
- {
- return getStoredMessage().getMetaData();
- }
-
- public String getRoutingKey()
- {
- return getMetaData().getRoutingKey();
- }
-
- public AMQShortString getRoutingKeyShortString()
- {
- return AMQShortString.valueOf(getRoutingKey());
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return getMetaData().getMessageHeader();
- }
-
- public boolean isPersistent()
- {
- return getMetaData().isPersistent();
- }
-
-
- public boolean isRedelivered()
- {
- // The *Message* is never redelivered, only queue entries are... this is here so that filters
- // can run against the message on entry to an exchange
- return false;
- }
-
- public long getSize()
- {
-
- return getMetaData().getSize();
- }
-
- public boolean isImmediate()
- {
- return getMetaData().isImmediate();
- }
-
- public long getExpiration()
- {
- return getMetaData().getExpiration();
- }
-
- public MessageReference newReference()
- {
- return new TransferMessageReference(this);
- }
-
- public long getMessageNumber()
- {
- return getStoredMessage().getMessageNumber();
- }
-
- public long getArrivalTime()
- {
- return getMetaData().getArrivalTime();
- }
-
- public int getContent(ByteBuffer buf, int offset)
- {
- return getStoredMessage().getContent(offset, buf);
- }
-
-
- public ByteBuffer getContent(int offset, int size)
- {
- return getStoredMessage().getContent(offset,size);
- }
-
- public Header getHeader()
- {
- return getMetaData().getHeader();
- }
-
- public ByteBuffer getBody()
- {
-
- return getContent(0, (int)getSize());
- }
-
- public Object getConnectionReference()
- {
- return _connectionRef;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
deleted file mode 100644
index ab50a33b9b..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
-{
-
- private static final byte[] AMQP_0_10_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 1,
- (byte) 1,
- (byte) 0,
- (byte) 10
- };
-
-
- public ProtocolEngineCreator_0_10()
- {
- }
-
- public AmqpProtocolVersion getVersion()
- {
- return AmqpProtocolVersion.v0_10;
- }
-
-
- public byte[] getHeaderIdentifier()
- {
- return AMQP_0_10_HEADER;
- }
-
- public ServerProtocolEngine newProtocolEngine(Broker broker,
- NetworkConnection network,
- Port port,
- Transport transport,
- long id)
- {
- String fqdn = null;
- SocketAddress address = network.getLocalAddress();
- if (address instanceof InetSocketAddress)
- {
- fqdn = ((InetSocketAddress) address).getHostName();
- }
- final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
- fqdn, broker.getSubjectCreator(address));
-
- ServerConnection conn = new ServerConnection(id,broker);
-
- conn.setConnectionDelegate(connDelegate);
- conn.setRemoteAddress(network.getRemoteAddress());
- conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network, port, transport);
- }
-
-
- private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_10();
-
- public static ProtocolEngineCreator getInstance()
- {
- return INSTANCE;
- }
-
- @Override
- public String getType()
- {
- return getVersion().toString();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
deleted file mode 100755
index 73708d9841..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-
-public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
-{
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private NetworkConnection _network;
- private long _readBytes;
- private long _writtenBytes;
- private ServerConnection _connection;
-
- private long _createTime = System.currentTimeMillis();
- private long _lastReadTime;
- private long _lastWriteTime;
-
- public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network,
- Port port,
- Transport transport)
- {
- super(new Assembler(conn));
- _connection = conn;
- _connection.setPort(port);
- _connection.setTransport(transport);
-
- if(network != null)
- {
- setNetworkConnection(network);
- }
-
-
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
- }
-
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- _network = network;
-
- _connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
- _connection.setPeerPrincipal(_network.getPeerPrincipal());
- // FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
- }
-
- private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
- {
- return new Sender<ByteBuffer>()
- {
- @Override
- public void setIdleTimeout(int i)
- {
- sender.setIdleTimeout(i);
-
- }
-
- @Override
- public void send(ByteBuffer msg)
- {
- _lastWriteTime = System.currentTimeMillis();
- sender.send(msg);
-
- }
-
- @Override
- public void flush()
- {
- sender.flush();
-
- }
-
- @Override
- public void close()
- {
- sender.close();
-
- }
- };
- }
-
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public void received(final ByteBuffer buf)
- {
- _lastReadTime = System.currentTimeMillis();
- super.received(buf);
- _connection.receivedComplete();
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void writerIdle()
- {
- _connection.doHeartbeat();
- }
-
- public void readerIdle()
- {
- //Todo
- }
-
- public String getAddress()
- {
- return getRemoteAddress().toString();
- }
-
- public String getAuthId()
- {
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- @Override
- public void closed()
- {
- super.closed();
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public long getConnectionId()
- {
- return _connection.getConnectionId();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
deleted file mode 100644
index 0015988ab7..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.security.auth.Subject;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.ProtocolEvent;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
-{
- private Runnable _onOpenTask;
- private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor;
-
- private Subject _authorizedSubject = null;
- private Principal _authorizedPrincipal = null;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final long _connectionId;
- private final Object _reference = new Object();
- private VirtualHost _virtualHost;
- private Port _port;
- private AtomicLong _lastIoTime = new AtomicLong();
- private boolean _blocking;
- private Principal _peerPrincipal;
- private NetworkConnection _networkConnection;
- private Transport _transport;
- private volatile boolean _stopped;
-
- public ServerConnection(final long connectionId, Broker broker)
- {
- _connectionId = connectionId;
- _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
- }
-
- public Object getReference()
- {
- return _reference;
- }
-
- @Override
- protected void invoke(Method method)
- {
- super.invoke(method);
- }
-
- @Override
- protected void setState(State state)
- {
- super.setState(state);
-
- if (state == State.OPEN)
- {
- if (_onOpenTask != null)
- {
- _onOpenTask.run();
- }
- _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true));
-
- getVirtualHost().getConnectionRegistry().registerConnection(this);
- }
-
- if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
- {
- if(_virtualHost != null)
- {
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
- }
- }
-
- if (state == State.CLOSED)
- {
- logClosed();
- }
- }
-
- protected void logClosed()
- {
- if(_logClosed.compareAndSet(false, true))
- {
- CurrentActor.get().message(this, ConnectionMessages.CLOSE());
- }
- }
-
- @Override
- public ServerConnectionDelegate getConnectionDelegate()
- {
- return (ServerConnectionDelegate) super.getConnectionDelegate();
- }
-
- public void setConnectionDelegate(ServerConnectionDelegate delegate)
- {
- super.setConnectionDelegate(delegate);
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
-
- initialiseStatistics();
- }
-
- @Override
- public String getVirtualHostName()
- {
- return _virtualHost == null ? null : _virtualHost.getName();
- }
-
- @Override
- public Port getPort()
- {
- return _port;
- }
-
- public void setPort(Port port)
- {
- _port = port;
- }
-
- @Override
- public Transport getTransport()
- {
- return _transport;
- }
-
- @Override
- public void stop()
- {
- _stopped = true;
- }
-
- @Override
- public boolean isStopped()
- {
- return _stopped;
- }
-
- public void setTransport(Transport transport)
- {
- _transport = transport;
- }
-
- public void onOpen(final Runnable task)
- {
- _onOpenTask = task;
- }
-
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
- {
- ExecutionException ex = new ExecutionException();
- ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
- try
- {
- code = ExecutionErrorCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // Ignore, already set to INTERNAL_ERROR
- }
- ex.setErrorCode(code);
- ex.setDescription(message);
- ((ServerSession)session).invoke(ex);
-
- ((ServerSession)session).close(cause, message);
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
- @Override
- public void received(ProtocolEvent event)
- {
- _lastIoTime.set(System.currentTimeMillis());
- if (event.isConnectionControl())
- {
- CurrentActor.set(_actor);
- }
- else
- {
- ServerSession channel = (ServerSession) getSession(event.getChannel());
- LogActor channelActor = null;
-
- if (channel != null)
- {
- channelActor = channel.getLogActor();
- }
-
- CurrentActor.set(channelActor == null ? _actor : channelActor);
- }
-
- try
- {
- super.received(event);
- }
- finally
- {
- CurrentActor.remove();
- }
- }
-
- public String toLogString()
- {
- boolean hasVirtualHost = (null != this.getVirtualHost());
- boolean hasClientId = (null != getClientId());
-
- if (hasClientId && hasVirtualHost)
- {
- return "[" +
- MessageFormat.format(CONNECTION_FORMAT,
- getConnectionId(),
- getClientId(),
- getRemoteAddressString(),
- getVirtualHost().getName())
- + "] ";
- }
- else if (hasClientId)
- {
- return "[" +
- MessageFormat.format(USER_FORMAT,
- getConnectionId(),
- getClientId(),
- getRemoteAddressString())
- + "] ";
-
- }
- else
- {
- return "[" +
- MessageFormat.format(SOCKET_FORMAT,
- getConnectionId(),
- getRemoteAddressString())
- + "] ";
- }
- }
-
- public LogActor getLogActor()
- {
- return _actor;
- }
-
- public void close(AMQConstant cause, String message) throws AMQException
- {
- closeSubscriptions();
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // Ignore
- }
- close(replyCode, message);
- }
-
- public synchronized void block()
- {
- if(!_blocking)
- {
- _blocking = true;
- for(AMQSessionModel ssn : getSessionModels())
- {
- ssn.block();
- }
- }
- }
-
- public synchronized void unblock()
- {
- if(_blocking)
- {
- _blocking = false;
- for(AMQSessionModel ssn : getSessionModels())
- {
- ssn.unblock();
- }
- }
- }
-
- @Override
- public synchronized void registerSession(final Session ssn)
- {
- super.registerSession(ssn);
- if(_blocking)
- {
- ((ServerSession)ssn).block();
- }
- }
-
- @Override
- public synchronized void removeSession(final Session ssn)
- {
- super.removeSession(ssn);
- }
-
- public List<AMQSessionModel> getSessionModels()
- {
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (Session ssn : getChannels())
- {
- sessions.add((AMQSessionModel) ssn);
- }
- return sessions;
- }
-
- public void registerMessageDelivered(long messageSize)
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- _virtualHost.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- _virtualHost.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
-
- public void initialiseStatistics()
- {
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
- }
-
- /**
- * @return authorizedSubject
- */
- public Subject getAuthorizedSubject()
- {
- return _authorizedSubject;
- }
-
- /**
- * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject
- * and caches it for optimisation purposes.
- *
- * @param authorizedSubject
- */
- public void setAuthorizedSubject(final Subject authorizedSubject)
- {
- if (authorizedSubject == null)
- {
- _authorizedSubject = null;
- _authorizedPrincipal = null;
- }
- else
- {
- _authorizedSubject = authorizedSubject;
- _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
- }
- }
-
- public Principal getAuthorizedPrincipal()
- {
- return _authorizedPrincipal;
- }
-
- public long getConnectionId()
- {
- return _connectionId;
- }
-
- public boolean isSessionNameUnique(byte[] name)
- {
- return !super.hasSessionWithName(name);
- }
-
- public String getRemoteAddressString()
- {
- return String.valueOf(getRemoteAddress());
- }
-
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
-
- @Override
- public void closed()
- {
- closeSubscriptions();
- super.closed();
- }
-
- private void closeSubscriptions()
- {
- for (Session ssn : getChannels())
- {
- ((ServerSession)ssn).unregisterSubscriptions();
- }
- }
-
- public void receivedComplete()
- {
- for (Session ssn : getChannels())
- {
- ((ServerSession)ssn).receivedComplete();
- }
- }
-
- @Override
- public void send(ProtocolEvent event)
- {
- _lastIoTime.set(System.currentTimeMillis());
- super.send(event);
- }
-
- public long getLastIoTime()
- {
- return _lastIoTime.longValue();
- }
-
-
- public String getClientId()
- {
- return getConnectionDelegate().getClientId();
- }
-
- public String getClientVersion()
- {
- return getConnectionDelegate().getClientVersion();
- }
-
- public String getPrincipalAsString()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
- public long getSessionCountLimit()
- {
- return getChannelMax();
- }
-
- public Principal getPeerPrincipal()
- {
- return _peerPrincipal;
- }
-
- public void setPeerPrincipal(Principal peerPrincipal)
- {
- _peerPrincipal = peerPrincipal;
- }
-
- @Override
- public void setRemoteAddress(SocketAddress remoteAddress)
- {
- super.setRemoteAddress(remoteAddress);
- }
-
- @Override
- public void setLocalAddress(SocketAddress localAddress)
- {
- super.setLocalAddress(localAddress);
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- _networkConnection = network;
- }
-
- public NetworkConnection getNetworkConnection()
- {
- return _networkConnection;
- }
-
- public void doHeartbeat()
- {
- super.doHeartBeat();
-
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
deleted file mode 100644
index 6634627805..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
-
-public class ServerConnectionDelegate extends ServerDelegate
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
-
- private final Broker _broker;
- private final String _localFQDN;
- private int _maxNoOfChannels;
- private Map<String,Object> _clientProperties;
- private final SubjectCreator _subjectCreator;
-
- public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
- {
- this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
- }
-
- private ServerConnectionDelegate(Map<String, Object> properties,
- List<Object> locales,
- Broker broker,
- String localFQDN,
- SubjectCreator subjectCreator)
- {
- super(properties, parseToList(subjectCreator.getMechanisms()), locales);
-
- _broker = broker;
- _localFQDN = localFQDN;
- _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT);
- _subjectCreator = subjectCreator;
- }
-
- private static List<String> getFeatures(Broker broker)
- {
- String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
- final List<String> features = new ArrayList<String>();
- if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
- {
- features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
- }
-
- return Collections.unmodifiableList(features);
- }
-
- private static Map<String, Object> createConnectionProperties(final Broker broker)
- {
- final Map<String,Object> map = new HashMap<String,Object>();
- // Federation tag is used by the client to identify the broker instance
- map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString());
- final List<String> features = getFeatures(broker);
- if (features != null && features.size() > 0)
- {
- map.put(ServerPropertyNames.QPID_FEATURES, features);
- }
-
- map.put(ServerPropertyNames.PRODUCT, QpidProperties.getProductName());
- map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion());
- map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion());
- map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
-
- return map;
- }
-
- private static List<Object> parseToList(String mechanisms)
- {
- List<Object> list = new ArrayList<Object>();
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
- while(tokenizer.hasMoreTokens())
- {
- list.add(tokenizer.nextToken());
- }
- return list;
- }
-
- public ServerSession getSession(Connection conn, SessionAttach atc)
- {
- SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
-
- ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
-
- return ssn;
- }
-
- protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
- {
- return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
-
- }
-
- protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
- {
- final ServerConnection sconn = (ServerConnection) conn;
- final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response);
-
- if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
- {
- tuneAuthorizedConnection(sconn);
- sconn.setAuthorizedSubject(authResult.getSubject());
- }
- else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
- {
- connectionAuthContinue(sconn, authResult.getChallenge());
- }
- else
- {
- connectionAuthFailed(sconn, authResult.getCause());
- }
- }
-
- @Override
- public void connectionClose(Connection conn, ConnectionClose close)
- {
- final ServerConnection sconn = (ServerConnection) conn;
- try
- {
- sconn.logClosed();
- }
- finally
- {
- sconn.closeCode(close);
- sconn.setState(CLOSE_RCVD);
- sendConnectionCloseOkAndCloseSender(conn);
- }
- }
-
- public void connectionOpen(Connection conn, ConnectionOpen open)
- {
- final ServerConnection sconn = (ServerConnection) conn;
-
- VirtualHost vhost;
- String vhostName;
- if(open.hasVirtualHost())
- {
- vhostName = open.getVirtualHost();
- }
- else
- {
- vhostName = "";
- }
- vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
-
- SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
-
- if(vhost != null)
- {
- sconn.setVirtualHost(vhost);
-
- if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress()))
- {
- sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
- }
- else if (vhost.getState() != State.ACTIVE)
- {
- sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active"));
- }
- else
- {
- sconn.setState(Connection.State.OPEN);
- sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
- }
- }
- else
- {
- sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
- }
-
- }
-
- @Override
- public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
- {
- ServerConnection sconn = (ServerConnection) conn;
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the server's offered limit (" + getChannelMax() +")");
-
- //Due to the error we must forcefully close the connection without negotiation
- sconn.getSender().close();
- return;
- }
-
- final NetworkConnection networkConnection = sconn.getNetworkConnection();
-
- if(ok.hasHeartbeat())
- {
- int heartbeat = ok.getHeartbeat();
- if(heartbeat < 0)
- {
- heartbeat = 0;
- }
-
- networkConnection.setMaxReadIdle(2 * heartbeat);
- networkConnection.setMaxWriteIdle(heartbeat);
-
- }
- else
- {
- networkConnection.setMaxReadIdle(0);
- networkConnection.setMaxWriteIdle(0);
- }
-
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
-
- @Override
- public int getChannelMax()
- {
- return _maxNoOfChannels;
- }
-
- protected void setChannelMax(int channelMax)
- {
- _maxNoOfChannels = channelMax;
- }
-
- @Override public void sessionDetach(Connection conn, SessionDetach dtc)
- {
- // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
- // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop
- // completes.
- stopAllSubscriptions(conn, dtc);
- Session ssn = conn.getSession(dtc.getChannel());
- ((ServerSession)ssn).setClose(true);
- super.sessionDetach(conn, dtc);
- }
-
- private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
- {
- final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
- {
- subscription_0_10.stop();
- }
- }
-
-
- @Override
- public void sessionAttach(final Connection conn, final SessionAttach atc)
- {
- final Session ssn;
-
- if(isSessionNameUnique(atc.getName(), conn))
- {
- super.sessionAttach(conn, atc);
- }
- else
- {
- ssn = getSession(conn, atc);
- ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
- ssn.closed();
- }
- }
-
- private boolean isSessionNameUnique(final byte[] name, final Connection conn)
- {
- final ServerConnection sconn = (ServerConnection) conn;
- final String userId = sconn.getUserName();
-
- final Iterator<AMQConnectionModel> connections =
- ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
- while(connections.hasNext())
- {
- final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
- if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void connectionStartOk(Connection conn, ConnectionStartOk ok)
- {
- _clientProperties = ok.getClientProperties();
- super.connectionStartOk(conn, ok);
- }
-
- public Map<String,Object> getClientProperties()
- {
- return _clientProperties;
- }
-
- public String getClientId()
- {
- return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10);
- }
-
- public String getClientVersion()
- {
- return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10);
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
deleted file mode 100644
index abe784cefa..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ /dev/null
@@ -1,1000 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.security.auth.Subject;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.TransactionTimeoutHelper;
-import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.txn.AlreadyKnownDtxException;
-import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
-import org.apache.qpid.server.txn.DistributedTransaction;
-import org.apache.qpid.server.txn.DtxNotSelectedException;
-import org.apache.qpid.server.txn.IncorrectDtxStateException;
-import org.apache.qpid.server.txn.JoinAndResumeDtxException;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.NotAssociatedDtxException;
-import org.apache.qpid.server.txn.RollbackOnlyDtxException;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.SuspendAndFailDtxException;
-import org.apache.qpid.server.txn.TimeoutDtxException;
-import org.apache.qpid.server.txn.UnknownDtxBranchException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
-
-public class ServerSession extends Session
- implements AuthorizationHolder,
- AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
-{
- private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
- private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
- private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
- private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
-
- private final UUID _id = UUID.randomUUID();
- private long _createTime = System.currentTimeMillis();
- private LogActor _actor = GenericActor.getInstance(this);
-
- private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
-
- private final AtomicBoolean _blocking = new AtomicBoolean(false);
- private ChannelLogSubject _logSubject;
- private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-
- public static interface MessageDispositionChangeListener
- {
- public void onAccept();
-
- public void onRelease(boolean setRedelivered);
-
- public void onReject();
-
- public boolean acquire();
-
-
- }
-
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
- private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
- new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
-
- private ServerTransaction _transaction;
-
- private final AtomicLong _txnStarts = new AtomicLong(0);
- private final AtomicLong _txnCommits = new AtomicLong(0);
- private final AtomicLong _txnRejects = new AtomicLong(0);
- private final AtomicLong _txnCount = new AtomicLong(0);
-
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
-
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
-
- private final TransactionTimeoutHelper _transactionTimeoutHelper;
-
- private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
-
- public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
- {
- super(connection, delegate, name, expiry);
- _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
- _logSubject = new ChannelLogSubject(this);
-
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
- {
- @Override
- public void doTimeoutAction(String reason) throws AMQException
- {
- getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
- }
- });
- }
-
- protected void setState(State state)
- {
- super.setState(state);
-
- if (state == State.OPEN)
- {
- _actor.message(ChannelMessages.CREATE());
- if(_blocking.get())
- {
- invokeBlock();
- }
- }
- }
-
- private void invokeBlock()
- {
- invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
- invoke(new MessageStop(""));
- }
-
- @Override
- protected boolean isFull(int id)
- {
- return isCommandsFull(id);
- }
-
- public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
- {
- if(_outstandingCredit.get() != UNLIMITED_CREDIT
- && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
- {
- _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
- invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
- }
- getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction);
- incrementOutstandingTxnsIfNecessary();
- }
-
-
- public void sendMessage(MessageTransfer xfr,
- Runnable postIdSettingAction)
- {
- getConnectionModel().registerMessageDelivered(xfr.getBodySize());
- invoke(xfr, postIdSettingAction);
- }
-
- public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
- {
- _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
- }
-
-
- private static interface MessageDispositionAction
- {
- void performAction(MessageDispositionChangeListener listener);
- }
-
- public void accept(RangeSet ranges)
- {
- dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onAccept();
- }
- });
- }
-
-
- public void release(RangeSet ranges, final boolean setRedelivered)
- {
- dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onRelease(setRedelivered);
- }
- });
- }
-
- public void reject(RangeSet ranges)
- {
- dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onReject();
- }
- });
- }
-
- public RangeSet acquire(RangeSet transfers)
- {
- RangeSet acquired = RangeSetFactory.createRangeSet();
-
- if(!_messageDispositionListenerMap.isEmpty())
- {
- Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
- Iterator<Range> rangeIter = transfers.iterator();
-
- if(rangeIter.hasNext())
- {
- Range range = rangeIter.next();
-
- while(range != null && unacceptedMessages.hasNext())
- {
- int next = unacceptedMessages.next();
- while(gt(next, range.getUpper()))
- {
- if(rangeIter.hasNext())
- {
- range = rangeIter.next();
- }
- else
- {
- range = null;
- break;
- }
- }
- if(range != null && range.includes(next))
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
- if(changeListener != null && changeListener.acquire())
- {
- acquired.add(next);
- }
- }
-
-
- }
-
- }
-
-
- }
-
- return acquired;
- }
-
- public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
- {
- if(ranges != null)
- {
-
- if(ranges.size() == 1)
- {
- Range r = ranges.getFirst();
- for(int i = r.getLower(); i <= r.getUpper(); i++)
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
- if(changeListener != null)
- {
- action.performAction(changeListener);
- }
- }
- }
- else if(!_messageDispositionListenerMap.isEmpty())
- {
- Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
- Iterator<Range> rangeIter = ranges.iterator();
-
- if(rangeIter.hasNext())
- {
- Range range = rangeIter.next();
-
- while(range != null && unacceptedMessages.hasNext())
- {
- int next = unacceptedMessages.next();
- while(gt(next, range.getUpper()))
- {
- if(rangeIter.hasNext())
- {
- range = rangeIter.next();
- }
- else
- {
- range = null;
- break;
- }
- }
- if(range != null && range.includes(next))
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
- action.performAction(changeListener);
- }
-
-
- }
-
- }
- }
- }
- }
-
- public void removeDispositionListener(Method method)
- {
- _messageDispositionListenerMap.remove(method.getId());
- }
-
- public void onClose()
- {
- if(_transaction instanceof LocalTransaction)
- {
- _transaction.rollback();
- }
- else if(_transaction instanceof DistributedTransaction)
- {
- getVirtualHost().getDtxRegistry().endAssociations(this);
- }
-
- for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
- {
- listener.onRelease(true);
- }
- _messageDispositionListenerMap.clear();
-
- for (Task task : _taskList)
- {
- task.doTask(this);
- }
-
- LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
- if (operationalLoggingMessage == null)
- {
- operationalLoggingMessage = ChannelMessages.CLOSE();
- }
- CurrentActor.get().message(getLogSubject(), operationalLoggingMessage);
- }
-
- @Override
- protected void awaitClose()
- {
- // Broker shouldn't block awaiting close - thus do override this method to do nothing
- }
-
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
- {
- _transaction.dequeue(entry.getQueue(), entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- sub.acknowledge(entry);
- }
-
- public void onRollback()
- {
- // The client has acknowledge the message and therefore have seen it.
- // In the event of rollback, the message must be marked as redelivered.
- entry.setRedelivered();
- entry.release();
- }
- });
- }
-
- public Collection<Subscription_0_10> getSubscriptions()
- {
- return _subscriptions.values();
- }
-
- public void register(String destination, Subscription_0_10 sub)
- {
- _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub);
- }
-
- public Subscription_0_10 getSubscription(String destination)
- {
- return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination);
- }
-
- public void unregister(Subscription_0_10 sub)
- {
- _subscriptions.remove(sub.getName());
- try
- {
- sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
-
- public boolean isTransactional()
- {
- return _transaction.isTransactional();
- }
-
- public void selectTx()
- {
- _transaction = new LocalTransaction(this.getMessageStore());
- _txnStarts.incrementAndGet();
- }
-
- public void selectDtx()
- {
- _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost());
-
- }
-
-
- public void startDtx(Xid xid, boolean join, boolean resume)
- throws JoinAndResumeDtxException,
- UnknownDtxBranchException,
- AlreadyKnownDtxException,
- DtxNotSelectedException
- {
- DistributedTransaction distributedTransaction = assertDtxTransaction();
- distributedTransaction.start(xid, join, resume);
- }
-
-
- public void endDtx(Xid xid, boolean fail, boolean suspend)
- throws NotAssociatedDtxException,
- UnknownDtxBranchException,
- DtxNotSelectedException,
- SuspendAndFailDtxException, TimeoutDtxException
- {
- DistributedTransaction distributedTransaction = assertDtxTransaction();
- distributedTransaction.end(xid, fail, suspend);
- }
-
-
- public long getTimeoutDtx(Xid xid)
- throws UnknownDtxBranchException
- {
- return getVirtualHost().getDtxRegistry().getTimeout(xid);
- }
-
-
- public void setTimeoutDtx(Xid xid, long timeout)
- throws UnknownDtxBranchException
- {
- getVirtualHost().getDtxRegistry().setTimeout(xid, timeout);
- }
-
-
- public void prepareDtx(Xid xid)
- throws UnknownDtxBranchException,
- IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
- {
- getVirtualHost().getDtxRegistry().prepare(xid);
- }
-
- public void commitDtx(Xid xid, boolean onePhase)
- throws UnknownDtxBranchException,
- IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
- {
- getVirtualHost().getDtxRegistry().commit(xid, onePhase);
- }
-
-
- public void rollbackDtx(Xid xid)
- throws UnknownDtxBranchException,
- IncorrectDtxStateException, AMQStoreException, TimeoutDtxException
- {
- getVirtualHost().getDtxRegistry().rollback(xid);
- }
-
-
- public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
- {
- getVirtualHost().getDtxRegistry().forget(xid);
- }
-
- public List<Xid> recoverDtx()
- {
- return getVirtualHost().getDtxRegistry().recover();
- }
-
- private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
- {
- if(_transaction instanceof DistributedTransaction)
- {
- return (DistributedTransaction) _transaction;
- }
- else
- {
- throw new DtxNotSelectedException();
- }
- }
-
-
- public void commit()
- {
- _transaction.commit();
-
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
- }
-
- public void rollback()
- {
- _transaction.rollback();
-
- _txnRejects.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
- }
-
-
- private void incrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 1 if 0.
- _txnCount.compareAndSet(0,1);
- }
- }
-
- private void decrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 0 if 1.
- _txnCount.compareAndSet(1,0);
- }
- }
-
- public Long getTxnCommits()
- {
- return _txnCommits.get();
- }
-
- public Long getTxnRejects()
- {
- return _txnRejects.get();
- }
-
- public int getChannelId()
- {
- return getChannel();
- }
-
- public Long getTxnCount()
- {
- return _txnCount.get();
- }
-
- public Long getTxnStart()
- {
- return _txnStarts.get();
- }
-
- public Principal getAuthorizedPrincipal()
- {
- return getConnection().getAuthorizedPrincipal();
- }
-
- public Subject getAuthorizedSubject()
- {
- return getConnection().getAuthorizedSubject();
- }
-
- public void addSessionCloseTask(Task task)
- {
- _taskList.add(task);
- }
-
- public void removeSessionCloseTask(Task task)
- {
- _taskList.remove(task);
- }
-
- public Object getReference()
- {
- return getConnection().getReference();
- }
-
- public MessageStore getMessageStore()
- {
- return getVirtualHost().getMessageStore();
- }
-
- public VirtualHost getVirtualHost()
- {
- return getConnection().getVirtualHost();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- public AMQConnectionModel getConnectionModel()
- {
- return getConnection();
- }
-
- public String getClientID()
- {
- return getConnection().getClientId();
- }
-
- @Override
- public ServerConnection getConnection()
- {
- return (ServerConnection) super.getConnection();
- }
-
- public LogActor getLogActor()
- {
- return _actor;
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
- {
- _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
- }
-
- public void block(AMQQueue queue)
- {
- block(queue, queue.getName());
- }
-
- public void block()
- {
- block(this, "** All Queues **");
- }
-
-
- private void block(Object queue, String name)
- {
- synchronized (_blockingEntities)
- {
- if(_blockingEntities.add(queue))
- {
-
- if(_blocking.compareAndSet(false,true))
- {
- if(getState() == State.OPEN)
- {
- invokeBlock();
- }
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
- }
-
-
- }
- }
- }
-
- public void unblock(AMQQueue queue)
- {
- unblock((Object)queue);
- }
-
- public void unblock()
- {
- unblock(this);
- }
-
- private void unblock(Object queue)
- {
- synchronized(_blockingEntities)
- {
- if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
- {
- if(_blocking.compareAndSet(true,false) && !isClosing())
- {
-
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
-
-
- }
- }
- }
- }
-
- public boolean onSameConnection(InboundMessage inbound)
- {
- return ((inbound instanceof MessageTransferMessage)
- && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
- || ((inbound instanceof MessageMetaData_0_10)
- && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
- }
-
-
- public String toLogString()
- {
- long connectionId = super.getConnection() instanceof ServerConnection
- ? getConnection().getConnectionId()
- : -1;
-
- String remoteAddress = String.valueOf(getConnection().getRemoteAddress());
- return "[" +
- MessageFormat.format(CHANNEL_FORMAT,
- connectionId,
- getClientID(),
- remoteAddress,
- getVirtualHost().getName(),
- getChannel())
- + "] ";
- }
-
- @Override
- public void close(AMQConstant cause, String message)
- {
- if (cause == null)
- {
- close();
- }
- else
- {
- close(cause.getCode(), message);
- }
- }
-
- void close(int cause, String message)
- {
- _forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
- close();
- }
-
- @Override
- public void close()
- {
- // unregister subscriptions in order to prevent sending of new messages
- // to subscriptions with closing session
- unregisterSubscriptions();
- super.close();
- }
-
- void unregisterSubscriptions()
- {
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
- {
- unregister(subscription_0_10);
- }
- }
-
- void stopSubscriptions()
- {
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
- {
- subscription_0_10.stop();
- }
- }
-
-
- public void receivedComplete()
- {
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
- {
- subscription_0_10.flushCreditState(false);
- }
- awaitCommandCompletion();
- }
-
- private class PostEnqueueAction implements ServerTransaction.Action
- {
-
- private List<? extends BaseQueue> _queues;
- private ServerMessage _message;
- private final boolean _transactional;
-
- public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
- {
- _transactional = transactional;
- setState(queues, message);
- }
-
- public void setState(List<? extends BaseQueue> queues, ServerMessage message)
- {
- _message = message;
- _queues = queues;
- }
-
- public void postCommit()
- {
- MessageReference<?> ref = _message.newReference();
- for(int i = 0; i < _queues.size(); i++)
- {
- try
- {
- BaseQueue queue = _queues.get(i);
- queue.enqueue(_message, _transactional, null);
- if(queue instanceof AMQQueue)
- {
- ((AMQQueue)queue).checkCapacity(ServerSession.this);
- }
-
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- ref.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- }
-
- public int getUnacknowledgedMessageCount()
- {
- return _messageDispositionListenerMap.size();
- }
-
- public boolean getBlocking()
- {
- return _blocking.get();
- }
-
- private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
-
- public void completeAsyncCommands()
- {
- AsyncCommand cmd;
- while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
- {
- cmd.complete();
- _unfinishedCommandsQueue.poll();
- }
- while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
- {
- cmd = _unfinishedCommandsQueue.poll();
- cmd.awaitReadyForCompletion();
- cmd.complete();
- }
- }
-
-
- public void awaitCommandCompletion()
- {
- AsyncCommand cmd;
- while((cmd = _unfinishedCommandsQueue.poll()) != null)
- {
- cmd.awaitReadyForCompletion();
- cmd.complete();
- }
- }
-
-
- public Object getAsyncCommandMark()
- {
- return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
- }
-
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
- {
- _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
- }
-
- private static class AsyncCommand
- {
- private final StoreFuture _future;
- private ServerTransaction.Action _action;
-
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
- {
- _future = future;
- _action = action;
- }
-
- void awaitReadyForCompletion()
- {
- _future.waitForCompletion();
- }
-
- void complete()
- {
- if(!_future.isComplete())
- {
- _future.waitForCompletion();
- }
- _action.postCommit();
- _action = null;
- }
-
- boolean isReadyForCompletion()
- {
- return _future.isComplete();
- }
- }
-
- protected void setClose(boolean close)
- {
- super.setClose(close);
- }
-
- @Override
- public int getConsumerCount()
- {
- return _subscriptions.values().size();
- }
-
- @Override
- public int compareTo(AMQSessionModel o)
- {
- return getId().compareTo(o.getId());
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
deleted file mode 100644
index 8e79813216..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ /dev/null
@@ -1,1582 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.HeadersExchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.AlreadyKnownDtxException;
-import org.apache.qpid.server.txn.DtxNotSelectedException;
-import org.apache.qpid.server.txn.IncorrectDtxStateException;
-import org.apache.qpid.server.txn.JoinAndResumeDtxException;
-import org.apache.qpid.server.txn.NotAssociatedDtxException;
-import org.apache.qpid.server.txn.RollbackOnlyDtxException;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.SuspendAndFailDtxException;
-import org.apache.qpid.server.txn.TimeoutDtxException;
-import org.apache.qpid.server.txn.UnknownDtxBranchException;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
-import org.apache.qpid.server.virtualhost.RequiredExchangeException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class ServerSessionDelegate extends SessionDelegate
-{
- private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
-
- /**
- * No-local queue argument is used to support the no-local feature of Durable Subscribers.
- */
- private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local";
-
- public ServerSessionDelegate()
- {
-
- }
-
- @Override
- public void command(Session session, Method method)
- {
- try
- {
- setThreadSubject(session);
-
- if(!session.isClosing())
- {
- Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
- super.command(session, method, false);
- Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
- if(newOutstanding == null || newOutstanding == asyncCommandMark)
- {
- session.processed(method);
- }
-
- if(newOutstanding != null)
- {
- ((ServerSession)session).completeAsyncCommands();
- }
-
- if (method.isSync())
- {
- ((ServerSession)session).awaitCommandCompletion();
- session.flushProcessed();
- }
- }
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Exception processing command", e);
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
- }
- }
-
- @Override
- public void messageAccept(Session session, MessageAccept method)
- {
- final ServerSession serverSession = (ServerSession) session;
- serverSession.accept(method.getTransfers());
- if(!serverSession.isTransactional())
- {
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
- new CommandProcessedAction(serverSession, method));
- }
- }
-
- @Override
- public void messageReject(Session session, MessageReject method)
- {
- ((ServerSession)session).reject(method.getTransfers());
- }
-
- @Override
- public void messageRelease(Session session, MessageRelease method)
- {
- ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
- }
-
- @Override
- public void messageAcquire(Session session, MessageAcquire method)
- {
- RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
-
- Acquired result = new Acquired(acquiredRanges);
-
-
- session.executionResult((int) method.getId(), result);
-
-
- }
-
- @Override
- public void messageResume(Session session, MessageResume method)
- {
- super.messageResume(session, method);
- }
-
- @Override
- public void messageSubscribe(Session session, MessageSubscribe method)
- {
- /*
- TODO - work around broken Python tests
- Correct code should read like
- if not hasAcceptMode() exception ILLEGAL_ARGUMENT "Accept-mode not supplied"
- else if not method.hasAcquireMode() exception ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"
- */
- if(!method.hasAcceptMode())
- {
- method.setAcceptMode(MessageAcceptMode.EXPLICIT);
- }
- if(!method.hasAcquireMode())
- {
- method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
-
- }
-
- if(!method.hasQueue())
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
- }
- else
- {
- String destination = method.getDestination();
-
- if(((ServerSession)session).getSubscription(destination)!=null)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'");
- }
- else
- {
- String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
-
-
- final AMQQueue queue = queueRegistry.getQueue(queueName);
-
- if(queue == null)
- {
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
- }
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else
- {
- if(queue.isExclusive())
- {
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
- if(queue.getAuthorizationHolder() == null)
- {
- queue.setAuthorizationHolder(s);
- queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- if(queue.getAuthorizationHolder() == session)
- {
- queue.setAuthorizationHolder(null);
- queue.setExclusiveOwningSession(null);
- }
- }
- });
- }
- }
-
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
-
- FilterManager filterManager = null;
- try
- {
- filterManager = FilterManagerFactory.createManager(method.getArguments());
- }
- catch (AMQException amqe)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
- return;
- }
-
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
-
- ((ServerSession)session).register(destination, sub);
- try
- {
- queue.registerSubscription(sub, method.getExclusive());
- }
- catch (AMQQueue.ExistingExclusiveSubscription existing)
- {
- exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
- }
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
- {
- exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot subscribe to queue '" + queueName + "' with destination '" + destination);
- }
- }
- }
- }
- }
-
- @Override
- public void messageTransfer(Session ssn, final MessageTransfer xfr)
- {
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
-
- DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
- .hasExpiration())
- {
- delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
- }
-
- final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
- messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
-
- if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
- {
- ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
- String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
- exception(ssn, xfr, errorCode, description);
-
- return;
- }
-
- final Exchange exchangeInUse;
- List<? extends BaseQueue> queues = exchange.route(messageMetaData);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(messageMetaData);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
- {
- exchangeInUse = exchange;
- }
-
- final ServerSession serverSession = (ServerSession) ssn;
- if(!queues.isEmpty())
- {
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
- serverSession.enqueue(message, queues);
- storeMessage.flushToStore();
- }
- else
- {
- if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
- {
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
- }
- else
- {
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
- }
- }
-
-
- if(serverSession.isTransactional())
- {
- serverSession.processed(xfr);
- }
- else
- {
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
- }
- }
-
- private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
- final MessageMetaData_0_10 messageMetaData, final MessageStore store)
- {
- final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
- ByteBuffer body = xfr.getBody();
- if(body != null)
- {
- storeMessage.addContent(0, body);
- }
- return storeMessage;
- }
-
- @Override
- public void messageCancel(Session session, MessageCancel method)
- {
- String destination = method.getDestination();
-
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
-
- if(sub == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
- }
- else
- {
- AMQQueue queue = sub.getQueue();
- ((ServerSession)session).unregister(sub);
- if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
- {
- queue.setAuthorizationHolder(null);
- }
- }
- }
-
- @Override
- public void messageFlush(Session session, MessageFlush method)
- {
- String destination = method.getDestination();
-
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
-
- if(sub == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
- }
- else
- {
-
- try
- {
- sub.flush();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot flush subscription '" + destination);
- }
- }
- }
-
- @Override
- public void txSelect(Session session, TxSelect method)
- {
- // TODO - check current tx mode
- ((ServerSession)session).selectTx();
- }
-
- @Override
- public void txCommit(Session session, TxCommit method)
- {
- // TODO - check current tx mode
- ((ServerSession)session).commit();
- }
-
- @Override
- public void txRollback(Session session, TxRollback method)
- {
- // TODO - check current tx mode
- ((ServerSession)session).rollback();
- }
-
- @Override
- public void dtxSelect(Session session, DtxSelect method)
- {
- // TODO - check current tx mode
- ((ServerSession)session).selectDtx();
- }
-
- @Override
- public void dtxStart(Session session, DtxStart method)
- {
- XaResult result = new XaResult();
- result.setStatus(DtxXaStatus.XA_OK);
- try
- {
- ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume());
- session.executionResult(method.getId(), result);
- }
- catch(JoinAndResumeDtxException e)
- {
- exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Unknown xid " + method.getXid());
- }
- catch(AlreadyKnownDtxException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Xid already started an neither join nor " +
- "resume set" + method.getXid());
- }
- catch(DtxNotSelectedException e)
- {
- exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
- }
-
- }
-
- @Override
- public void dtxEnd(Session session, DtxEnd method)
- {
- XaResult result = new XaResult();
- result.setStatus(DtxXaStatus.XA_OK);
- try
- {
- try
- {
- ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
- }
- catch (TimeoutDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
- }
- session.executionResult(method.getId(), result);
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(NotAssociatedDtxException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(DtxNotSelectedException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(SuspendAndFailDtxException e)
- {
- exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
- }
-
- }
-
- @Override
- public void dtxCommit(Session session, DtxCommit method)
- {
- XaResult result = new XaResult();
- result.setStatus(DtxXaStatus.XA_OK);
- try
- {
- try
- {
- ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase());
- }
- catch (RollbackOnlyDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBROLLBACK);
- }
- catch (TimeoutDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
- }
- session.executionResult(method.getId(), result);
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- catch(IncorrectDtxStateException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(AMQStoreException e)
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
-
- @Override
- public void dtxForget(Session session, DtxForget method)
- {
- try
- {
- ((ServerSession)session).forgetDtx(method.getXid());
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- catch(IncorrectDtxStateException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
-
- }
-
- @Override
- public void dtxGetTimeout(Session session, DtxGetTimeout method)
- {
- GetTimeoutResult result = new GetTimeoutResult();
- try
- {
- result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid()));
- session.executionResult(method.getId(), result);
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- }
-
- @Override
- public void dtxPrepare(Session session, DtxPrepare method)
- {
- XaResult result = new XaResult();
- result.setStatus(DtxXaStatus.XA_OK);
- try
- {
- try
- {
- ((ServerSession)session).prepareDtx(method.getXid());
- }
- catch (RollbackOnlyDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBROLLBACK);
- }
- catch (TimeoutDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
- }
- session.executionResult((int) method.getId(), result);
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- catch(IncorrectDtxStateException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(AMQStoreException e)
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
-
- @Override
- public void dtxRecover(Session session, DtxRecover method)
- {
- RecoverResult result = new RecoverResult();
- List inDoubt = ((ServerSession)session).recoverDtx();
- result.setInDoubt(inDoubt);
- session.executionResult(method.getId(), result);
- }
-
- @Override
- public void dtxRollback(Session session, DtxRollback method)
- {
-
- XaResult result = new XaResult();
- result.setStatus(DtxXaStatus.XA_OK);
- try
- {
- try
- {
- ((ServerSession)session).rollbackDtx(method.getXid());
- }
- catch (TimeoutDtxException e)
- {
- result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
- }
- session.executionResult(method.getId(), result);
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- catch(IncorrectDtxStateException e)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
- }
- catch(AMQStoreException e)
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
-
- @Override
- public void dtxSetTimeout(Session session, DtxSetTimeout method)
- {
- try
- {
- ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout());
- }
- catch(UnknownDtxBranchException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
- }
- }
-
- @Override
- public void executionSync(final Session ssn, final ExecutionSync sync)
- {
- ((ServerSession)ssn).awaitCommandCompletion();
- super.executionSync(ssn, sync);
- }
-
- @Override
- public void exchangeDeclare(Session session, ExchangeDeclare method)
- {
- String exchangeName = method.getExchange();
- VirtualHost virtualHost = getVirtualHost(session);
-
- //we must check for any unsupported arguments present and throw not-implemented
- if(method.hasArguments())
- {
- Map<String,Object> args = method.getArguments();
- //QPID-3392: currently we don't support any!
- if(!args.isEmpty())
- {
- exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
- return;
- }
- }
-
- if(method.getPassive())
- {
- Exchange exchange = getExchange(session, exchangeName);
-
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
- }
- else
- {
- if (!exchange.getTypeShortString().toString().equals(method.getType())
- && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
- }
- }
- }
- else
- {
-
- try
- {
- virtualHost.createExchange(null,
- method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete(),
- method.getAlternateExchange());
- }
- catch(ReservedExchangeNameException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved name or prefix.");
- }
- catch(UnknownExchangeException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + e.getExchangeName());
- }
- catch(AMQUnknownExchangeType e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
- }
- catch(ExchangeExistsException e)
- {
- Exchange exchange = e.getExistingExchange();
- if(!exchange.getTypeShortString().toString().equals(method.getType()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeShortString()
- + " to " + method.getType() +".");
- }
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
- }
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
- }
-
-
- }
-
- }
-
- // TODO decouple AMQException and AMQConstant error codes
- private void exception(Session session, Method method, AMQException exception, String message)
- {
- ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
- if (exception.getErrorCode() != null)
- {
- try
- {
- errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // ignore, already set to INTERNAL_ERROR
- }
- }
- String description = message + "': " + exception.getMessage();
-
- exception(session, method, errorCode, description);
- }
-
- private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
- {
- ExecutionException ex = new ExecutionException();
- ex.setErrorCode(errorCode);
- ex.setCommandId(method.getId());
- ex.setDescription(description);
-
- session.invoke(ex);
-
- ((ServerSession)session).close(errorCode.getValue(), description);
- }
-
- private Exchange getExchange(Session session, String exchangeName)
- {
- return getVirtualHost(session).getExchange(exchangeName);
- }
-
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
- {
- VirtualHost virtualHost = getVirtualHost(ssn);
-
- Exchange exchange;
- if(xfr.hasDestination())
- {
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
- {
- exchange = virtualHost.getDefaultExchange();
- }
- }
- else
- {
- exchange = virtualHost.getDefaultExchange();
- }
- return exchange;
- }
-
- private VirtualHost getVirtualHost(Session session)
- {
- ServerConnection conn = getServerConnection(session);
- VirtualHost vhost = conn.getVirtualHost();
- return vhost;
- }
-
- private ServerConnection getServerConnection(Session session)
- {
- ServerConnection conn = (ServerConnection) session.getConnection();
- return conn;
- }
-
- @Override
- public void exchangeDelete(Session session, ExchangeDelete method)
- {
- VirtualHost virtualHost = getVirtualHost(session);
-
- try
- {
- if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange");
- return;
- }
-
- Exchange exchange = getExchange(session, method.getExchange());
-
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'");
- }
- else
- {
- virtualHost.removeExchange(exchange, !method.getIfUnused());
- }
- }
- catch (ExchangeInUseException e)
- {
- exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
- }
- catch (ExchangeIsAlternateException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
- }
- catch (RequiredExchangeException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete exchange '" + method.getExchange() );
- }
- }
-
- private boolean nameNullOrEmpty(String name)
- {
- if(name == null || name.length() == 0)
- {
- return true;
- }
-
- return false;
- }
-
- private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes)
- {
- for(ExchangeType type : registeredTypes)
- {
- if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
- {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void exchangeQuery(Session session, ExchangeQuery method)
- {
-
- ExchangeQueryResult result = new ExchangeQueryResult();
-
- Exchange exchange = getExchange(session, method.getName());
-
- if(exchange != null)
- {
- result.setDurable(exchange.isDurable());
- result.setType(exchange.getTypeShortString().toString());
- result.setNotFound(false);
- }
- else
- {
- result.setNotFound(true);
- }
-
- session.executionResult((int) method.getId(), result);
- }
-
- @Override
- public void exchangeBind(Session session, ExchangeBind method)
- {
-
- VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- if (!method.hasQueue())
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
- }
- else if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
- }
- else
- {
- //TODO - here because of non-compiant python tests
- // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
- if (!method.hasBindingKey())
- {
- method.setBindingKey(method.getQueue());
- }
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = virtualHost.getExchange(method.getExchange());
- if(queue == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
- }
- else if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
- }
- else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
- }
- else
- {
- if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
- {
- try
- {
- exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot add binding '" + method.getBindingKey());
- }
- }
- else
- {
- // todo
- }
- }
-
-
- }
-
-
-
- }
-
- @Override
- public void exchangeUnbind(Session session, ExchangeUnbind method)
- {
- VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- if (!method.hasQueue())
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
- }
- else if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange");
- }
- else if (!method.hasBindingKey())
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
- }
- else
- {
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = virtualHost.getExchange(method.getExchange());
- if(queue == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
- }
- else if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
- }
- else
- {
- try
- {
- exchange.removeBinding(method.getBindingKey(), queue, null);
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot remove binding '" + method.getBindingKey());
- }
- }
- }
- }
-
- @Override
- public void exchangeBound(Session session, ExchangeBound method)
- {
-
- ExchangeBoundResult result = new ExchangeBoundResult();
- VirtualHost virtualHost = getVirtualHost(session);
- Exchange exchange;
- AMQQueue queue;
- if(method.hasExchange())
- {
- exchange = virtualHost.getExchange(method.getExchange());
-
- if(exchange == null)
- {
- result.setExchangeNotFound(true);
- }
- }
- else
- {
- exchange = virtualHost.getDefaultExchange();
- }
-
-
- if(method.hasQueue())
- {
-
- queue = getQueue(session, method.getQueue());
- if(queue == null)
- {
- result.setQueueNotFound(true);
- }
-
-
- if(exchange != null && queue != null)
- {
-
- boolean queueMatched = exchange.isBound(queue);
-
- result.setQueueNotMatched(!queueMatched);
-
-
- if(method.hasBindingKey())
- {
-
- if(queueMatched)
- {
- final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue);
- result.setKeyNotMatched(!keyMatched);
- if(method.hasArguments())
- {
- if(keyMatched)
- {
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue));
- }
- else
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
- }
- }
- }
- else
- {
- boolean keyMatched = exchange.isBound(method.getBindingKey());
- result.setKeyNotMatched(!keyMatched);
- if(method.hasArguments())
- {
- if(keyMatched)
- {
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
- }
- else
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
- }
- }
-
- }
- else if (method.hasArguments())
- {
- if(queueMatched)
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
- }
- else
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
- }
-
- }
- else if(exchange != null && method.hasBindingKey())
- {
- final boolean keyMatched = exchange.isBound(method.getBindingKey());
- result.setKeyNotMatched(!keyMatched);
-
- if(method.hasArguments())
- {
- if(keyMatched)
- {
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
- }
- else
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
- }
-
-
- }
-
- }
- else if(exchange != null && method.hasBindingKey())
- {
- final boolean keyMatched = exchange.isBound(method.getBindingKey());
- result.setKeyNotMatched(!keyMatched);
-
- if(method.hasArguments())
- {
- if(keyMatched)
- {
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
- }
- else
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
- }
-
- }
- else if(exchange != null && method.hasArguments())
- {
- result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
-
-
- session.executionResult((int) method.getId(), result);
-
-
- }
-
- private AMQQueue getQueue(Session session, String queue)
- {
- QueueRegistry queueRegistry = getQueueRegistry(session);
- return queueRegistry.getQueue(queue);
- }
-
- private QueueRegistry getQueueRegistry(Session session)
- {
- return getVirtualHost(session).getQueueRegistry();
- }
-
- @Override
- public void queueDeclare(Session session, final QueueDeclare method)
- {
-
- VirtualHost virtualHost = getVirtualHost(session);
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-
- String queueName = method.getQueue();
- AMQQueue queue;
- QueueRegistry queueRegistry = getQueueRegistry(session);
- //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
-
- synchronized (queueRegistry)
- {
-
- if (((queue = queueRegistry.getQueue(queueName)) == null))
- {
-
- if (method.getPassive())
- {
- String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
-
- exception(session, method, errorCode, description);
-
- return;
- }
- else
- {
- try
- {
- queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
- if(!method.getExclusive() && method.getAutoDelete())
- {
- queue.setDeleteOnNoConsumers(true);
- }
-
- final String alternateExchangeName = method.getAlternateExchange();
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
- {
- Exchange alternate = getExchange(session, alternateExchangeName);
- queue.setAlternateExchange(alternate);
- }
-
- if(method.hasArguments() && method.getArguments() != null)
- {
- if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL))
- {
- Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL);
- queue.setNoLocal(convertBooleanValue(noLocal));
- }
- }
-
-
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- if(method.hasArguments() && method.getArguments() != null)
- {
- Map<String,Object> args = method.getArguments();
- FieldTable ftArgs = new FieldTable();
- for(Map.Entry<String, Object> entry : args.entrySet())
- {
- ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
- }
- DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs);
- }
- else
- {
- DurableConfigurationStoreHelper.createQueue(store, queue, null);
- }
- }
- queueRegistry.registerQueue(queue);
-
- if (method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
- {
- final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- try
- {
- q.delete();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete '" + method.getQueue());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
- }
- if (method.hasExclusive()
- && method.getExclusive())
- {
- final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
- }
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare queue '" + queueName);
- }
- }
- }
- else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
- {
- String description = "Cannot declare queue('" + queueName + "'),"
- + " as exclusive queue with same name "
- + "declared on another session";
- ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-
- exception(session, method, errorCode, description);
-
- return;
- }
- }
- }
-
- /**
- * Converts a queue argument into a boolean value. For compatibility with the C++
- * and the clients, accepts with Boolean, String, or Number types.
- * @param argValue argument value.
- *
- * @return true if set
- */
- private boolean convertBooleanValue(Object argValue)
- {
- if(argValue instanceof Boolean && ((Boolean)argValue))
- {
- return true;
- }
- else if (argValue instanceof String && Boolean.parseBoolean((String)argValue))
- {
- return true;
- }
- else if (argValue instanceof Number && ((Number)argValue).intValue() != 0)
- {
- return true;
- }
- return false;
- }
-
- protected AMQQueue createQueue(final String queueName,
- final QueueDeclare body,
- VirtualHost virtualHost,
- final ServerSession session)
- throws AMQException
- {
- String owner = body.getExclusive() ? session.getClientID() : null;
-
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner,
- body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
-
- return queue;
- }
-
- @Override
- public void queueDelete(Session session, QueueDelete method)
- {
- String queueName = method.getQueue();
- if(queueName == null || queueName.length()==0)
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
-
- }
- else
- {
- AMQQueue queue = getQueue(session, queueName);
-
-
- if (queue == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
- }
- else
- {
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if (method.getIfEmpty() && !queue.isEmpty())
- {
- exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
- }
- else if (method.getIfUnused() && !queue.isUnused())
- {
- // TODO - Error code
- exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use");
-
- }
- else
- {
- VirtualHost virtualHost = getVirtualHost(session);
-
- try
- {
- queue.delete();
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeQueue(store,queue);
- }
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete queue '" + queueName);
- }
- }
- }
- }
- }
-
- @Override
- public void queuePurge(Session session, QueuePurge method)
- {
- String queueName = method.getQueue();
- if(queueName == null || queueName.length()==0)
- {
- exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
- }
- else
- {
- AMQQueue queue = getQueue(session, queueName);
-
- if (queue == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
- }
- else
- {
- try
- {
- queue.clearQueue();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot purge queue '" + queueName);
- }
- }
- }
- }
-
- @Override
- public void queueQuery(Session session, QueueQuery method)
- {
- QueueQueryResult result = new QueueQueryResult();
-
- AMQQueue queue = getQueue(session, method.getQueue());
-
- if(queue != null)
- {
- result.setQueue(queue.getNameShortString().toString());
- result.setDurable(queue.isDurable());
- result.setExclusive(queue.isExclusive());
- result.setAutoDelete(queue.isAutoDelete());
- result.setArguments(queue.getArguments());
- result.setMessageCount(queue.getMessageCount());
- result.setSubscriberCount(queue.getConsumerCount());
-
- }
-
-
- session.executionResult((int) method.getId(), result);
-
- }
-
- @Override
- public void messageSetFlowMode(Session session, MessageSetFlowMode sfm)
- {
- String destination = sfm.getDestination();
-
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
-
- if(sub == null)
- {
- exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
- }
- else if(sub.isStopped())
- {
- sub.setFlowMode(sfm.getFlowMode());
- }
- }
-
- @Override
- public void messageStop(Session session, MessageStop stop)
- {
- String destination = stop.getDestination();
-
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
-
- if(sub == null)
- {
- exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
- }
- else
- {
- sub.stop();
- }
-
- }
-
- @Override
- public void messageFlow(Session session, MessageFlow flow)
- {
- String destination = flow.getDestination();
-
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
-
- if(sub == null)
- {
- exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
- }
- else
- {
- sub.addCredit(flow.getUnit(), flow.getValue());
- }
-
- }
-
- @Override
- public void closed(Session session)
- {
- setThreadSubject(session);
-
- ServerSession serverSession = (ServerSession)session;
-
- serverSession.stopSubscriptions();
- serverSession.onClose();
- serverSession.unregisterSubscriptions();
- }
-
- @Override
- public void detached(Session session)
- {
- closed(session);
- }
-
- private void setThreadSubject(Session session)
- {
- final ServerConnection scon = (ServerConnection) session.getConnection();
- SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
- }
-
- private static class CommandProcessedAction implements ServerTransaction.Action
- {
- private final ServerSession _serverSession;
- private final Method _method;
-
- public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
- {
- _serverSession = serverSession;
- _method = xfr;
- }
-
- public void postCommit()
- {
- _serverSession.processed(_method);
- }
-
- public void onRollback()
- {
- }
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
deleted file mode 100644
index c6bceb6ac7..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ /dev/null
@@ -1,952 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
-{
- private final long _subscriptionID;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private static final Option[] BATCHED = new Option[] { Option.BATCH };
-
- private final Lock _stateChangeLock = new ReentrantLock();
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
- private FlowCreditManager_0_10 _creditManager;
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
- }
- };
- private AMQQueue _queue;
- private final String _destination;
- private boolean _noLocal;
- private final FilterManager _filters;
- private final MessageAcceptMode _acceptMode;
- private final MessageAcquireMode _acquireMode;
- private MessageFlowMode _flowMode;
- private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
-
- private LogActor _logActor;
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private String _traceExclude;
- private String _trace;
- private final long _createTime = System.currentTimeMillis();
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private final Map<String, Object> _arguments;
- private int _deferredMessageCredit;
- private long _deferredSizeCredit;
-
-
- public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _session = session;
- _postIdSettingAction = new AddMessageDispositionListenerAction(session);
- _destination = destination;
- _acceptMode = acceptMode;
- _acquireMode = acquireMode;
- _creditManager = creditManager;
- _flowMode = flowMode;
- _filters = filters;
- _creditManager.addStateListener(this);
- _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
- Collections.<String, Object> unmodifiableMap(arguments);
- _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
-
- }
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- Map<String, Object> arguments = queue.getArguments();
- _traceExclude = (String) arguments.get("qpid.trace.exclude");
- _trace = (String) arguments.get("qpid.trace.id");
- String filterLogString = null;
-
- _logActor = GenericActor.getInstance(this);
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- filterLogString = getFilterLogString();
- CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
- filterLogString.length() > 0));
- }
- }
-
- public String getConsumerName()
- {
- return _destination;
- }
-
- public boolean isSuspended()
- {
- return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
-
-
-
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
-
- return false;
- }
-
- if (entry.getMessage() instanceof MessageTransferMessage)
- {
- if(_noLocal)
- {
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
- {
- return false;
- }
- }
- }
- else
- {
- // no interest in messages we can't convert
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null)
- {
- return false;
- }
- }
-
-
- return checkFilters(entry);
-
-
- }
-
- private boolean checkFilters(QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry);
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
- public boolean isBrowser()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean seesRequeues()
- {
- return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- finally
- {
- _stateChangeLock.unlock();
- }
-
-
-
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
-
- public static class AddMessageDispositionListenerAction implements Runnable
- {
- private MessageTransfer _xfr;
- private ServerSession.MessageDispositionChangeListener _action;
- private ServerSession _session;
-
- public AddMessageDispositionListenerAction(ServerSession session)
- {
- _session = session;
- }
-
- public void setXfr(MessageTransfer xfr)
- {
- _xfr = xfr;
- }
-
- public void setAction(ServerSession.MessageDispositionChangeListener action)
- {
- _action = action;
- }
-
- public void run()
- {
- if(_action != null)
- {
- _session.onMessageDispositionChange(_xfr, _action);
- }
- }
- }
-
- private final AddMessageDispositionListenerAction _postIdSettingAction;
-
- public void send(final QueueEntry entry, boolean batch) throws AMQException
- {
- ServerMessage serverMsg = entry.getMessage();
-
-
- MessageTransfer xfr;
-
- DeliveryProperties deliveryProps;
- MessageProperties messageProps = null;
-
- MessageTransferMessage msg;
-
- if(serverMsg instanceof MessageTransferMessage)
- {
-
- msg = (MessageTransferMessage) serverMsg;
-
- }
- else
- {
- MessageConverter converter =
- MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
-
-
- msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
- }
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
-
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
-
- boolean excludeDueToFederation = false;
-
- if(_trace != null)
- {
- if(!messageProps.hasApplicationHeaders())
- {
- messageProps.setApplicationHeaders(new HashMap<String,Object>());
- }
- Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
- String trace = (String) appHeaders.get("x-qpid.trace");
- if(trace == null)
- {
- trace = _trace;
- }
- else
- {
- if(_traceExclude != null)
- {
- excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
- }
- trace+=","+_trace;
- }
- appHeaders.put("x-qpid.trace",trace);
- }
-
- if(!excludeDueToFederation)
- {
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
- }
- else if(_flowMode == MessageFlowMode.WINDOW)
- {
- xfr.setCompletionListener(new Method.CompletionListener()
- {
- public void onComplete(Method method)
- {
- deferredAddCredit(1, entry.getSize());
- }
- });
- }
-
-
- _postIdSettingAction.setXfr(xfr);
- if(_acceptMode == MessageAcceptMode.EXPLICIT)
- {
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
- }
- else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
- }
- else
- {
- _postIdSettingAction.setAction(null);
- }
-
-
- _session.sendMessage(xfr, _postIdSettingAction);
- entry.incrementDeliveryCount();
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- forceDequeue(entry, false);
- }
- else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- recordUnacknowledged(entry);
- }
- }
- else
- {
- forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
-
- }
- }
-
- void recordUnacknowledged(QueueEntry entry)
- {
- _unacknowledgedCount.incrementAndGet();
- _unacknowledgedBytes.addAndGet(entry.getSize());
- }
-
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
-
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
- {
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if (restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.discard();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
- void reject(final QueueEntry entry)
- {
- entry.setRedelivered();
- entry.routeToAlternate();
- if(entry.isAcquiredBy(this))
- {
- entry.discard();
- }
- }
-
- void release(final QueueEntry entry, final boolean setRedelivered)
- {
- if (setRedelivered)
- {
- entry.setRedelivered();
- }
-
- if (getSessionModel().isClosing() || !setRedelivered)
- {
- entry.decrementDeliveryCount();
- }
-
- if (isMaxDeliveryLimitReached(entry))
- {
- sendToDLQOrDiscard(entry);
- }
- else
- {
- entry.release();
- }
- }
-
- protected void sendToDLQOrDiscard(QueueEntry entry)
- {
- final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
- final LogActor logActor = CurrentActor.get();
- final ServerMessage msg = entry.getMessage();
- if (alternateExchange != null)
- {
- final InboundMessage m = new InboundMessageAdapter(entry);
-
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
-
- if (destinationQueues == null || destinationQueues.isEmpty())
- {
- entry.discard();
-
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
- }
- else
- {
- entry.routeToAlternate();
-
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
- }
- }
- }
- else
- {
- entry.discard();
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
- }
- }
-
- private boolean isMaxDeliveryLimitReached(QueueEntry entry)
- {
- final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
- return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean wouldSuspend(QueueEntry entry)
- {
- return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void restoreCredit(QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void onDequeue(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void setStateListener(StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public FlowCreditManager_0_10 getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void stop()
- {
- try
- {
- getSendLock();
-
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
- }
-
- public void addCredit(MessageCreditUnit unit, long value)
- {
- FlowCreditManager_0_10 creditManager = getCreditManager();
-
- switch (unit)
- {
- case MESSAGE:
-
- creditManager.addCredit(value, 0L);
- break;
- case BYTE:
- creditManager.addCredit(0l, value);
- break;
- }
-
- _stopped.set(false);
-
- if(creditManager.hasCredit())
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- }
-
- }
-
- public void setFlowMode(MessageFlowMode flowMode)
- {
-
-
- _creditManager.removeListener(this);
-
- switch(flowMode)
- {
- case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
- break;
- case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
- break;
- default:
- throw new RuntimeException("Unknown message flow mode: " + flowMode);
- }
- _flowMode = flowMode;
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
-
- _creditManager.addStateListener(this);
-
- }
-
- public boolean isStopped()
- {
- return _stopped.get();
- }
-
- public boolean acquires()
- {
- return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
- }
-
- public void acknowledge(QueueEntry entry)
- {
- // TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(this))
- {
- _unacknowledgedBytes.addAndGet(-entry.getSize());
- _unacknowledgedCount.decrementAndGet();
- entry.discard();
- }
- }
-
- public void flush() throws AMQException
- {
- flushCreditState(true);
- _queue.flushSubscription(this);
- stop();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public ServerSession getSessionModel()
- {
- return _session;
- }
-
- public boolean isBrowsing()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
-
- public boolean isExplicitAcknowledge()
- {
- return _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public String getCreditMode()
- {
- return _flowMode.toString();
- }
-
- public String getName()
- {
- return _destination;
- }
-
- public Map<String, Object> getArguments()
- {
- return _arguments;
- }
-
- public boolean isSessionTransactional()
- {
- return _session.isTransactional();
- }
-
- public void queueEmpty()
- {
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public String toLogString()
- {
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getNameShortString());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
- return result;
- }
-
- private String getFilterLogString()
- {
- StringBuilder filterLogString = new StringBuilder();
- String delimiter = ", ";
- boolean hasEntries = false;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString.append(_filters.toString());
- hasEntries = true;
- }
-
- if (isBrowser())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Browser");
- hasEntries = true;
- }
-
- if (isDurable())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Durable");
- hasEntries = true;
- }
-
- return filterLogString.toString();
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
-
- public void flushBatched()
- {
- _session.getConnection().flush();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java
deleted file mode 100644
index 0c04f22232..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.server.message.MessageReference;
-
-public class TransferMessageReference extends MessageReference<MessageTransferMessage>
-{
- public TransferMessageReference(MessageTransferMessage message)
- {
- super(message);
- }
-
- protected void onReference(MessageTransferMessage message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(MessageTransferMessage message)
- {
- message.decrementReference();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
deleted file mode 100644
index 8e48741b91..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;
-
-public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
-{
- private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
-
- private volatile long _bytesCreditLimit;
- private volatile long _messageCreditLimit;
-
- private volatile long _bytesUsed;
- private volatile long _messageUsed;
-
- public WindowCreditManager()
- {
- this(0L, 0L);
- }
-
- public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
- setSuspended(!hasCredit());
-
- }
-
- public long getBytesCreditLimit()
- {
- return _bytesCreditLimit;
- }
-
- public long getMessageCreditLimit()
- {
- return _messageCreditLimit;
- }
-
- public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCreditLimit == -1L
- ? Long.MAX_VALUE
- : _messageUsed < _messageCreditLimit ? _messageCreditLimit - _messageUsed : 0L;
- }
-
- public long getBytesCredit()
- {
- return _bytesCreditLimit == -1L
- ? Long.MAX_VALUE
- : _bytesUsed < _bytesCreditLimit ? _bytesCreditLimit - _bytesUsed : 0L;
- }
-
- public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
- {
- _messageUsed -= messageCredit;
- if(_messageUsed < 0L)
- {
- LOGGER.error("Message credit used value was negative: "+ _messageUsed);
- _messageUsed = 0;
- }
-
- boolean notifyIncrease = true;
-
- if(_messageCreditLimit > 0L)
- {
- notifyIncrease = (_messageUsed != _messageCreditLimit);
- }
-
- _bytesUsed -= bytesCredit;
- if(_bytesUsed < 0L)
- {
- LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
- _bytesUsed = 0;
- }
-
- if(_bytesCreditLimit > 0L)
- {
- notifyIncrease = notifyIncrease && bytesCredit>0;
-
- if(notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
- }
-
- setSuspended(!hasCredit());
- }
-
-
-
- public synchronized boolean hasCredit()
- {
- return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
- && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
- }
-
- public synchronized boolean useCreditForMessage(final long msgSize)
- {
- if(_messageCreditLimit >= 0L)
- {
- if(_messageUsed < _messageCreditLimit)
- {
- if(_bytesCreditLimit < 0L)
- {
- _messageUsed++;
-
- return true;
- }
- else if(_bytesUsed + msgSize <= _bytesCreditLimit)
- {
- _messageUsed++;
- _bytesUsed += msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
- }
- else
- {
- setSuspended(true);
- return false;
- }
- }
- else if(_bytesCreditLimit >= 0L)
- {
- if(_bytesUsed + msgSize <= _bytesCreditLimit)
- {
- _bytesUsed += msgSize;
-
- return true;
- }
- else
- {
- return false;
- }
-
- }
- else
- {
- return true;
- }
-
- }
-
-
- public synchronized void addCredit(long count, long bytes)
- {
- if(bytes > 0)
- {
- _bytesCreditLimit += bytes;
- }
- else if(bytes == -1)
- {
- _bytesCreditLimit = -1;
- }
-
-
- if(count > 0)
- {
- _messageCreditLimit += count;
- }
- else if(count == -1)
- {
- _messageCreditLimit = -1;
- }
- }
-
- public void clearCredit()
- {
- _bytesCreditLimit = 0l;
- _messageCreditLimit = 0l;
- setSuspended(true);
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 3e1e8baddc..82b1f6a193 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -184,7 +184,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _lastReceivedTime;
private boolean _blocking;
- private final Lock _receivedLock;
+ private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker _broker;
private final Transport _transport;
@@ -973,16 +973,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
synchronized(this)
{
+
+ boolean lockHeld = _receivedLock.isHeldByCurrentThread();
+
while(!_closed)
{
try
{
+ if(lockHeld)
+ {
+ _receivedLock.unlock();
+ }
wait(1000);
}
catch (InterruptedException e)
{
}
+ finally
+ {
+ if(lockHeld)
+ {
+ _receivedLock.lock();
+ }
+ }
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
index f17b79d896..e66f0889dd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
@@ -23,20 +23,11 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
-
-import java.util.Map;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
deleted file mode 100644
index ab1546ed0a..0000000000
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10
-org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8
-org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
index e0c0aa5873..43ad3adf13 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
@@ -17,4 +17,3 @@
# under the License.
#
org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8
-org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
index b771e25328..57ca615a04 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
@@ -19,4 +19,3 @@
org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8
org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9
org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1
-org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10