diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 20:09:37 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 20:09:37 +0000 |
| commit | 91782b4b801528ca5fb4a3f9e3c2879d1f02c3a1 (patch) | |
| tree | 5327b5868831224b95cbf84cad54397e5dea6c7a /qpid/java/broker/src/main | |
| parent | 43d9b8b9928c56ee200ae1a3323796f393dec0f7 (diff) | |
| download | qpid-python-91782b4b801528ca5fb4a3f9e3c2879d1f02c3a1.tar.gz | |
QPID-4659 : [Java Broker] move amqp 0-10 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
26 files changed, 15 insertions, 6758 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java deleted file mode 100644 index c14896079f..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ /dev/null @@ -1,271 +0,0 @@ -package org.apache.qpid.server.protocol.converter.v0_8_v0_10; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.ReplyTo; - -public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage> -{ - private static final int BASIC_CLASS_ID = 60; - - public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage, - VirtualHost vhost) - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - - Header header = messageTransferMessage.getHeader(); - DeliveryProperties deliveryProps = header.getDeliveryProperties(); - MessageProperties messageProps = header.getMessageProperties(); - - if(deliveryProps != null) - { - if(deliveryProps.hasDeliveryMode()) - { - props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT - ? BasicContentHeaderProperties.PERSISTENT - : BasicContentHeaderProperties.NON_PERSISTENT)); - } - if(deliveryProps.hasExpiration()) - { - props.setExpiration(deliveryProps.getExpiration()); - } - if(deliveryProps.hasPriority()) - { - props.setPriority((byte) deliveryProps.getPriority().getValue()); - } - if(deliveryProps.hasTimestamp()) - { - props.setTimestamp(deliveryProps.getTimestamp()); - } - } - if(messageProps != null) - { - if(messageProps.hasAppId()) - { - props.setAppId(new AMQShortString(messageProps.getAppId())); - } - if(messageProps.hasContentType()) - { - props.setContentType(messageProps.getContentType()); - } - if(messageProps.hasCorrelationId()) - { - props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId())); - } - if(messageProps.hasContentEncoding()) - { - props.setEncoding(messageProps.getContentEncoding()); - } - if(messageProps.hasMessageId()) - { - props.setMessageId("ID:" + messageProps.getMessageId().toString()); - } - if(messageProps.hasReplyTo()) - { - ReplyTo replyTo = messageProps.getReplyTo(); - String exchangeName = replyTo.getExchange(); - String routingKey = replyTo.getRoutingKey(); - if(exchangeName == null) - { - exchangeName = ""; - } - - Exchange exchange = vhost.getExchange(exchangeName); - String exchangeClass = exchange == null - ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() - : exchange.getType().getName().asString(); - props.setReplyTo(exchangeClass + "://" + exchangeName + "//?routingkey='" + (routingKey == null - ? "" - : routingKey + "'")); - - } - if(messageProps.hasUserId()) - { - props.setUserId(new AMQShortString(messageProps.getUserId())); - } - - if(messageProps.hasApplicationHeaders()) - { - Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); - if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) - { - props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); - } - - FieldTable ft = new FieldTable(); - for(Map.Entry<String, Object> entry : appHeaders.entrySet()) - { - try - { - ft.put(new AMQShortString(entry.getKey()), entry.getValue()); - } - catch (AMQPInvalidClassException e) - { - // TODO - // log here, but ignore - just can;t convert - } - } - props.setHeaders(ft); - - } - } - - return props; - } - - @Override - public Class<MessageTransferMessage> getInputClass() - { - return MessageTransferMessage.class; - } - - @Override - public Class<AMQMessage> getOutputClass() - { - return AMQMessage.class; - } - - @Override - public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost) - { - return new AMQMessage(convertToStoredMessage(message, vhost)); - } - - private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, - VirtualHost vhost) - { - final MessageMetaData metaData = convertMetaData(message, vhost); - return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>() - { - @Override - public MessageMetaData getMetaData() - { - return metaData; - } - - @Override - public long getMessageNumber() - { - return message.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return message.getContent(dst, offsetInMessage); - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - return message.getContent(offsetInMessage, size); - } - - @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost) - { - return new MessageMetaData(convertPublishBody(message), - convertContentHeaderBody(message, vhost), - 1, - message.getArrivalTime()); - } - - private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost) - { - BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost); - ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); - chb.setBodySize(message.getSize()); - return chb; - } - - private MessagePublishInfo convertPublishBody(MessageTransferMessage message) - { - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - final AMQShortString exchangeName = (delvProps == null || delvProps.getExchange() == null) - ? null - : new AMQShortString(delvProps.getExchange()); - final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null) - ? null - : new AMQShortString(delvProps.getRoutingKey()); - final boolean immediate = delvProps != null && delvProps.getImmediate(); - final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable(); - - return new MessagePublishInfo() - { - @Override - public AMQShortString getExchange() - { - return exchangeName; - } - - @Override - public void setExchange(AMQShortString exchange) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isImmediate() - { - return immediate; - } - - @Override - public boolean isMandatory() - { - return mandatory; - } - - @Override - public AMQShortString getRoutingKey() - { - return routingKey; - } - }; - } - - @Override - public String getType() - { - return "v0-10 to v0-8"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java deleted file mode 100644 index e1e8fbd9d3..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.converter.v0_8_v0_10; - -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.ReplyTo; -import org.apache.qpid.url.AMQBindingURL; - -public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessage, MessageTransferMessage> -{ - @Override - public Class<AMQMessage> getInputClass() - { - return AMQMessage.class; - } - - @Override - public Class<MessageTransferMessage> getOutputClass() - { - return MessageTransferMessage.class; - } - - @Override - public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost) - { - return new MessageTransferMessage(convertToStoredMessage(message_0_8), null); - } - - private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8) - { - final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8); - return new StoredMessage<MessageMetaData_0_10>() - { - @Override - public MessageMetaData_0_10 getMetaData() - { - return messageMetaData_0_10; - } - - @Override - public long getMessageNumber() - { - return message_0_8.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return message_0_8.getContent(dst, offsetInMessage); - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - return message_0_8.getContent(offsetInMessage, size); - } - - @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8) - { - DeliveryProperties deliveryProps = new DeliveryProperties(); - MessageProperties messageProps = new MessageProperties(); - - int size = (int) message_0_8.getSize(); - ByteBuffer body = ByteBuffer.allocate(size); - message_0_8.getContent(body, 0); - body.flip(); - - BasicContentHeaderProperties properties = - (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); - - final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); - if(exchange != null) - { - deliveryProps.setExchange(exchange.toString()); - } - - deliveryProps.setExpiration(message_0_8.getExpiration()); - deliveryProps.setImmediate(message_0_8.isImmediate()); - deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority())); - deliveryProps.setRoutingKey(message_0_8.getRoutingKey()); - deliveryProps.setTimestamp(properties.getTimestamp()); - - messageProps.setContentEncoding(properties.getEncodingAsString()); - messageProps.setContentLength(size); - if(properties.getAppId() != null) - { - messageProps.setAppId(properties.getAppId().getBytes()); - } - messageProps.setContentType(properties.getContentTypeAsString()); - if(properties.getCorrelationId() != null) - { - messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); - } - - if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) - { - String origReplyToString = properties.getReplyTo().asString(); - ReplyTo replyTo = new ReplyTo(); - // if the string looks like a binding URL, then attempt to parse it... - try - { - AMQBindingURL burl = new AMQBindingURL(origReplyToString); - AMQShortString routingKey = burl.getRoutingKey(); - if(routingKey != null) - { - replyTo.setRoutingKey(routingKey.asString()); - } - - AMQShortString exchangeName = burl.getExchangeName(); - if(exchangeName != null) - { - replyTo.setExchange(exchangeName.asString()); - } - } - catch (URISyntaxException e) - { - replyTo.setRoutingKey(origReplyToString); - } - messageProps.setReplyTo(replyTo); - - } - - if(properties.getMessageId() != null) - { - try - { - String messageIdAsString = properties.getMessageIdAsString(); - if(messageIdAsString.startsWith("ID:")) - { - messageIdAsString = messageIdAsString.substring(3); - } - UUID uuid = UUID.fromString(messageIdAsString); - messageProps.setMessageId(uuid); - } - catch(IllegalArgumentException e) - { - // ignore - can't parse - } - } - - - - if(properties.getUserId() != null) - { - messageProps.setUserId(properties.getUserId().getBytes()); - } - - FieldTable fieldTable = properties.getHeaders(); - - Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); - - if(properties.getType() != null) - { - appHeaders.put("x-jms-type", properties.getTypeAsString()); - } - - - messageProps.setApplicationHeaders(appHeaders); - - Header header = new Header(deliveryProps, messageProps, null); - - - return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime()); - } - - @Override - public String getType() - { - return "v0-8 to v0-10"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java deleted file mode 100644 index cee1a04b17..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - - -import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 -{ - private volatile long _bytesCredit; - private volatile long _messageCredit; - - public CreditCreditManager(long bytesCredit, long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - setSuspended(!hasCredit()); - - } - - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) - { - } - - - public synchronized void addCredit(final long messageCredit, final long bytesCredit) - { - boolean notifyIncrease = true; - if(_messageCredit >= 0L && messageCredit > 0L) - { - notifyIncrease = _messageCredit != 0L; - _messageCredit += messageCredit; - } - - - - if(_bytesCredit >= 0L && bytesCredit > 0L) - { - notifyIncrease = notifyIncrease && bytesCredit>0; - _bytesCredit += bytesCredit; - - - - if(notifyIncrease) - { - notifyIncreaseBytesCredit(); - } - } - - - - setSuspended(!hasCredit()); - - } - - public void clearCredit() - { - _bytesCredit = 0l; - _messageCredit = 0l; - setSuspended(true); - } - - - public synchronized boolean hasCredit() - { - // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); - } - - public synchronized boolean useCreditForMessage(long msgSize) - { - if(_messageCredit >= 0L) - { - if(_messageCredit > 0) - { - if(_bytesCredit < 0L) - { - _messageCredit--; - - return true; - } - else if(msgSize <= _bytesCredit) - { - _messageCredit--; - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - } - else - { - setSuspended(true); - return false; - } - } - else if(_bytesCredit >= 0L) - { - if(msgSize <= _bytesCredit) - { - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - - } - else - { - return true; - } - - } - - public synchronized void stop() - { - if(_bytesCredit > 0) - { - _bytesCredit = 0; - } - if(_messageCredit > 0) - { - _messageCredit = 0; - } - - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java deleted file mode 100755 index 4b38b8a1a3..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.queue.QueueEntry; - - -class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener -{ - private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class); - - - private final QueueEntry _entry; - private final Subscription_0_10 _sub; - - public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) - { - _entry = entry; - _sub = subscription_0_10; - } - - public void onAccept() - { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) - { - subscription.getSessionModel().acknowledge(subscription, _entry); - } - else - { - _logger.warn("MessageAccept received for message which has not been acquired (likely client error)"); - } - - } - - public void onRelease(boolean setRedelivered) - { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) - { - subscription.release(_entry, setRedelivered); - } - else - { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); - } - } - - public void onReject() - { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) - { - subscription.reject(_entry); - } - else - { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); - } - - } - - public boolean acquire() - { - return _entry.acquire(getSubscription()); - } - - - private Subscription_0_10 getSubscription() - { - return _sub; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java deleted file mode 100755 index 7f092814da..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.server.flow.FlowCreditManager; - -public interface FlowCreditManager_0_10 extends FlowCreditManager -{ - public void addCredit(long count, long bytes); - - void clearCredit(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java deleted file mode 100755 index ce0155b789..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.queue.QueueEntry; - -class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener -{ - private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class); - - - private final QueueEntry _entry; - private Subscription_0_10 _sub; - - public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) - { - _entry = entry; - _sub = subscription_0_10; - } - - public void onAccept() - { - _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)"); - } - - public void onRelease(boolean setRedelivered) - { - if(_entry.isAcquiredBy(_sub)) - { - getSubscription().release(_entry, setRedelivered); - } - else - { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); - } - } - - public void onReject() - { - if(_entry.isAcquiredBy(_sub)) - { - getSubscription().reject(_entry); - } - else - { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); - } - - } - - public boolean acquire() - { - boolean acquired = _entry.acquire(getSubscription()); - if(acquired) - { - getSubscription().recordUnacknowledged(_entry); - } - return acquired; - - } - - public Subscription_0_10 getSubscription() - { - return _sub; - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java deleted file mode 100755 index f5f2a8d43f..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.transport.Method; - -public class MessageAcceptCompletionListener implements Method.CompletionListener -{ - private final Subscription_0_10 _sub; - private final QueueEntry _entry; - private final ServerSession _session; - private boolean _restoreCredit; - - public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) - { - super(); - _sub = sub; - _entry = entry; - _session = session; - _restoreCredit = restoreCredit; - } - - public void onComplete(Method method) - { - if(_restoreCredit) - { - _sub.restoreCredit(_entry); - } - if(_entry.isAcquiredBy(_sub)) - { - _session.acknowledge(_sub, _entry); - } - - _session.removeDispositionListener(method); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java deleted file mode 100644 index c6ae0c6e47..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.nio.ByteBuffer; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; - -public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage> -{ - @Override - public Class<ServerMessage> getInputClass() - { - return ServerMessage.class; - } - - @Override - public Class<MessageTransferMessage> getOutputClass() - { - return MessageTransferMessage.class; - } - - @Override - public MessageTransferMessage convert(ServerMessage serverMsg, VirtualHost vhost) - { - return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); - } - - private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final ServerMessage serverMsg) - { - final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg); - - return new StoredMessage<MessageMetaData_0_10>() - { - @Override - public MessageMetaData_0_10 getMetaData() - { - return messageMetaData_0_10; - } - - @Override - public long getMessageNumber() - { - return serverMsg.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return serverMsg.getContent(dst, offsetInMessage); - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - return serverMsg.getContent(offsetInMessage, size); - } - - @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg) - { - DeliveryProperties deliveryProps = new DeliveryProperties(); - MessageProperties messageProps = new MessageProperties(); - - int size = (int) serverMsg.getSize(); - ByteBuffer body = ByteBuffer.allocate(size); - serverMsg.getContent(body, 0); - body.flip(); - - - deliveryProps.setExpiration(serverMsg.getExpiration()); - deliveryProps.setImmediate(serverMsg.isImmediate()); - deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); - deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); - - messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); - messageProps.setContentLength(size); - messageProps.setContentType(serverMsg.getMessageHeader().getMimeType()); - if(serverMsg.getMessageHeader().getCorrelationId() != null) - { - messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); - } - - Header header = new Header(deliveryProps, messageProps, null); - return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); - } - - @Override - public String getType() - { - return "Unknown to v0-10"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java deleted file mode 100644 index 90fb443f5b..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.nio.ByteBuffer; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.store.StoredMessage; - -public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10> -{ - - public static final int TYPE = 1; - - @Override - public int ordinal() - { - return TYPE; - } - - @Override - public MessageMetaData_0_10 createMetaData(ByteBuffer buf) - { - return MessageMetaData_0_10.FACTORY.createMetaData(buf); - } - - @Override - public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg) - { - return new MessageTransferMessage(msg, null); - } - - public int hashCode() - { - return ordinal(); - } - - public boolean equals(Object o) - { - return o != null && o.getClass() == getClass(); - } - - @Override - public String getType() - { - return AmqpProtocolVersion.v0_10.toString(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java deleted file mode 100755 index ee2a40a5b2..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ /dev/null @@ -1,281 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.BBEncoder; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage -{ - private Header _header; - private DeliveryProperties _deliveryProps; - private MessageProperties _messageProps; - private MessageTransferHeader _messageHeader; - private long _arrivalTime; - private int _bodySize; - - private static final int ENCODER_SIZE = 1 << 10; - - public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); - - private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10(); - - private volatile ByteBuffer _encoded; - private Object _connectionReference; - - - public MessageMetaData_0_10(MessageTransfer xfr) - { - this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis()); - } - - public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) - { - _header = header; - if(_header != null) - { - _deliveryProps = _header.getDeliveryProperties(); - _messageProps = _header.getMessageProperties(); - } - else - { - _deliveryProps = null; - _messageProps = null; - } - _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); - _arrivalTime = arrivalTime; - _bodySize = bodySize; - - } - - - - public MessageMetaDataType getType() - { - return TYPE; - } - - public int getStorableSize() - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - - //TODO -- need to add stuff - return buf.limit(); - } - - private ByteBuffer encodeAsBuffer() - { - BBEncoder encoder = new BBEncoder(ENCODER_SIZE); - - encoder.writeInt64(_arrivalTime); - encoder.writeInt32(_bodySize); - int headersLength = 0; - if(_header.getDeliveryProperties() != null) - { - headersLength++; - } - if(_header.getMessageProperties() != null) - { - headersLength++; - } - if(_header.getNonStandardProperties() != null) - { - headersLength += _header.getNonStandardProperties().size(); - } - - encoder.writeInt32(headersLength); - - if(_header.getDeliveryProperties() != null) - { - encoder.writeStruct32(_header.getDeliveryProperties()); - } - if(_header.getMessageProperties() != null) - { - encoder.writeStruct32(_header.getMessageProperties()); - } - if(_header.getNonStandardProperties() != null) - { - - for(Struct header : _header.getNonStandardProperties()) - { - encoder.writeStruct32(header); - } - - } - ByteBuffer buf = encoder.buffer(); - return buf; - } - - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - - buf = buf.duplicate(); - - buf.position(offsetInMetaData); - - if(dest.remaining() < buf.limit()) - { - buf.limit(dest.remaining()); - } - dest.put(buf); - return buf.limit(); - } - - public int getContentSize() - { - return _bodySize; - } - - public boolean isPersistent() - { - return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT; - } - - public String getRoutingKey() - { - return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); - } - - public AMQShortString getRoutingKeyShortString() - { - return AMQShortString.valueOf(getRoutingKey()); - } - - public AMQMessageHeader getMessageHeader() - { - return _messageHeader; - } - - public long getSize() - { - - return _bodySize; - } - - public boolean isImmediate() - { - return _deliveryProps != null && _deliveryProps.getImmediate(); - } - - public long getExpiration() - { - return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); - } - - public boolean isRedelivered() - { - // The *Message* is never redelivered, only queue entries are... - return false; - } - - public long getArrivalTime() - { - return _arrivalTime; - } - - public Header getHeader() - { - return _header; - } - - public void setConnectionReference(Object connectionReference) - { - _connectionReference = connectionReference; - } - - public Object getConnectionReference() - { - return _connectionReference; - } - - private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> - { - public MessageMetaData_0_10 createMetaData(ByteBuffer buf) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(buf); - - long arrivalTime = decoder.readInt64(); - int bodySize = decoder.readInt32(); - int headerCount = decoder.readInt32(); - - DeliveryProperties deliveryProperties = null; - MessageProperties messageProperties = null; - List<Struct> otherProps = null; - - for(int i = 0 ; i < headerCount; i++) - { - Struct struct = decoder.readStruct32(); - if(struct instanceof DeliveryProperties && deliveryProperties == null) - { - deliveryProperties = (DeliveryProperties) struct; - } - else if(struct instanceof MessageProperties && messageProperties == null) - { - messageProperties = (MessageProperties) struct; - } - else - { - if(otherProps == null) - { - otherProps = new ArrayList<Struct>(); - - } - otherProps.add(struct); - } - } - Header header = new Header(deliveryProperties,messageProperties,otherProps); - - return new MessageMetaData_0_10(header, bodySize, arrivalTime); - - } - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java deleted file mode 100644 index 1b506d9bf8..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.util.*; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; - -class MessageTransferHeader implements AMQMessageHeader -{ - - - public static final String JMS_TYPE = "x-jms-type"; - - private final DeliveryProperties _deliveryProps; - private final MessageProperties _messageProps; - - public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps) - { - _deliveryProps = deliveryProps; - _messageProps = messageProps; - } - - public String getCorrelationId() - { - if (_messageProps != null && _messageProps.getCorrelationId() != null) - { - return new String(_messageProps.getCorrelationId()); - } - else - { - return null; - } - } - - public long getExpiration() - { - return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); - } - - public String getUserId() - { - byte[] userIdBytes = _messageProps == null ? null : _messageProps.getUserId(); - return userIdBytes == null ? null : new String(userIdBytes); - } - - public String getAppId() - { - byte[] appIdBytes = _messageProps == null ? null : _messageProps.getAppId(); - return appIdBytes == null ? null : new String(appIdBytes); - } - - public String getMessageId() - { - UUID id = _messageProps == null ? null : _messageProps.getMessageId(); - - return id == null ? null : String.valueOf(id); - } - - public String getMimeType() - { - return _messageProps == null ? null : _messageProps.getContentType(); - } - - public String getEncoding() - { - return _messageProps == null ? null : _messageProps.getContentEncoding(); - } - - public byte getPriority() - { - MessageDeliveryPriority priority = _deliveryProps == null || !_deliveryProps.hasPriority() - ? MessageDeliveryPriority.MEDIUM - : _deliveryProps.getPriority(); - return (byte) priority.getValue(); - } - - public long getTimestamp() - { - return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp(); - } - - public String getType() - { - Object type = getHeader(JMS_TYPE); - return type instanceof String ? (String) type : null; - } - - public String getReplyTo() - { - if (_messageProps != null && _messageProps.getReplyTo() != null) - { - return _messageProps.getReplyTo().toString(); - } - else - { - return null; - } - } - - public String getReplyToExchange() - { - if (_messageProps != null && _messageProps.getReplyTo() != null) - { - return _messageProps.getReplyTo().getExchange(); - } - else - { - return null; - } - } - - public String getReplyToRoutingKey() - { - if (_messageProps != null && _messageProps.getReplyTo() != null) - { - return _messageProps.getReplyTo().getRoutingKey(); - } - else - { - return null; - } - } - - public Object getHeader(String name) - { - Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); - return appHeaders == null ? null : appHeaders.get(name); - } - - public boolean containsHeaders(Set<String> names) - { - Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); - return appHeaders != null && appHeaders.keySet().containsAll(names); - - } - - @Override - public Collection<String> getHeaderNames() - { - Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); - return appHeaders != null ? Collections.unmodifiableCollection(appHeaders.keySet()) : Collections.EMPTY_SET ; - - } - - public boolean containsHeader(String name) - { - Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); - return appHeaders != null && appHeaders.containsKey(name); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java deleted file mode 100644 index 4e8bfcb652..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.transport.Header; - -import java.nio.ByteBuffer; - - -public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage -{ - - private Object _connectionRef; - - public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef) - { - super(storeMessage); - _connectionRef = connectionRef; - } - - private MessageMetaData_0_10 getMetaData() - { - return getStoredMessage().getMetaData(); - } - - public String getRoutingKey() - { - return getMetaData().getRoutingKey(); - } - - public AMQShortString getRoutingKeyShortString() - { - return AMQShortString.valueOf(getRoutingKey()); - } - - public AMQMessageHeader getMessageHeader() - { - return getMetaData().getMessageHeader(); - } - - public boolean isPersistent() - { - return getMetaData().isPersistent(); - } - - - public boolean isRedelivered() - { - // The *Message* is never redelivered, only queue entries are... this is here so that filters - // can run against the message on entry to an exchange - return false; - } - - public long getSize() - { - - return getMetaData().getSize(); - } - - public boolean isImmediate() - { - return getMetaData().isImmediate(); - } - - public long getExpiration() - { - return getMetaData().getExpiration(); - } - - public MessageReference newReference() - { - return new TransferMessageReference(this); - } - - public long getMessageNumber() - { - return getStoredMessage().getMessageNumber(); - } - - public long getArrivalTime() - { - return getMetaData().getArrivalTime(); - } - - public int getContent(ByteBuffer buf, int offset) - { - return getStoredMessage().getContent(offset, buf); - } - - - public ByteBuffer getContent(int offset, int size) - { - return getStoredMessage().getContent(offset,size); - } - - public Header getHeader() - { - return getMetaData().getHeader(); - } - - public ByteBuffer getBody() - { - - return getContent(0, (int)getSize()); - } - - public Object getConnectionReference() - { - return _connectionRef; - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java deleted file mode 100644 index ab50a33b9b..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.network.NetworkConnection; - -public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator -{ - - private static final byte[] AMQP_0_10_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 0, - (byte) 10 - }; - - - public ProtocolEngineCreator_0_10() - { - } - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v0_10; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_0_10_HEADER; - } - - public ServerProtocolEngine newProtocolEngine(Broker broker, - NetworkConnection network, - Port port, - Transport transport, - long id) - { - String fqdn = null; - SocketAddress address = network.getLocalAddress(); - if (address instanceof InetSocketAddress) - { - fqdn = ((InetSocketAddress) address).getHostName(); - } - final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, - fqdn, broker.getSubjectCreator(address)); - - ServerConnection conn = new ServerConnection(id,broker); - - conn.setConnectionDelegate(connDelegate); - conn.setRemoteAddress(network.getRemoteAddress()); - conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network, port, transport); - } - - - private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_10(); - - public static ProtocolEngineCreator getInstance() - { - return INSTANCE; - } - - @Override - public String getType() - { - return getVersion().toString(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java deleted file mode 100755 index 73708d9841..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.logging.messages.ConnectionMessages; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; - - -public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine -{ - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private NetworkConnection _network; - private long _readBytes; - private long _writtenBytes; - private ServerConnection _connection; - - private long _createTime = System.currentTimeMillis(); - private long _lastReadTime; - private long _lastWriteTime; - - public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, - Port port, - Transport transport) - { - super(new Assembler(conn)); - _connection = conn; - _connection.setPort(port); - _connection.setTransport(transport); - - if(network != null) - { - setNetworkConnection(network); - } - - - } - - public void setNetworkConnection(NetworkConnection network) - { - setNetworkConnection(network, network.getSender()); - } - - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - _network = network; - - _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); - _connection.setPeerPrincipal(_network.getPeerPrincipal()); - // FIXME Two log messages to maintain compatibility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false)); - } - - private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) - { - return new Sender<ByteBuffer>() - { - @Override - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - - } - - @Override - public void send(ByteBuffer msg) - { - _lastWriteTime = System.currentTimeMillis(); - sender.send(msg); - - } - - @Override - public void flush() - { - sender.flush(); - - } - - @Override - public void close() - { - sender.close(); - - } - }; - } - - @Override - public long getLastReadTime() - { - return _lastReadTime; - } - - @Override - public long getLastWriteTime() - { - return _lastWriteTime; - } - - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - public void received(final ByteBuffer buf) - { - _lastReadTime = System.currentTimeMillis(); - super.received(buf); - _connection.receivedComplete(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - _connection.doHeartbeat(); - } - - public void readerIdle() - { - //Todo - } - - public String getAddress() - { - return getRemoteAddress().toString(); - } - - public String getAuthId() - { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); - } - - public boolean isDurable() - { - return false; - } - - @Override - public void closed() - { - super.closed(); - } - - public long getCreateTime() - { - return _createTime; - } - - public long getConnectionId() - { - return _connection.getConnectionId(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java deleted file mode 100644 index 0015988ab7..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ /dev/null @@ -1,560 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.net.SocketAddress; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import javax.security.auth.Subject; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ConnectionMessages; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.ProtocolEvent; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.network.NetworkConnection; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; - -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder -{ - private Runnable _onOpenTask; - private AtomicBoolean _logClosed = new AtomicBoolean(false); - private LogActor _actor; - - private Subject _authorizedSubject = null; - private Principal _authorizedPrincipal = null; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - private final long _connectionId; - private final Object _reference = new Object(); - private VirtualHost _virtualHost; - private Port _port; - private AtomicLong _lastIoTime = new AtomicLong(); - private boolean _blocking; - private Principal _peerPrincipal; - private NetworkConnection _networkConnection; - private Transport _transport; - private volatile boolean _stopped; - - public ServerConnection(final long connectionId, Broker broker) - { - _connectionId = connectionId; - _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); - } - - public Object getReference() - { - return _reference; - } - - @Override - protected void invoke(Method method) - { - super.invoke(method); - } - - @Override - protected void setState(State state) - { - super.setState(state); - - if (state == State.OPEN) - { - if (_onOpenTask != null) - { - _onOpenTask.run(); - } - _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true)); - - getVirtualHost().getConnectionRegistry().registerConnection(this); - } - - if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) - { - if(_virtualHost != null) - { - _virtualHost.getConnectionRegistry().deregisterConnection(this); - } - } - - if (state == State.CLOSED) - { - logClosed(); - } - } - - protected void logClosed() - { - if(_logClosed.compareAndSet(false, true)) - { - CurrentActor.get().message(this, ConnectionMessages.CLOSE()); - } - } - - @Override - public ServerConnectionDelegate getConnectionDelegate() - { - return (ServerConnectionDelegate) super.getConnectionDelegate(); - } - - public void setConnectionDelegate(ServerConnectionDelegate delegate) - { - super.setConnectionDelegate(delegate); - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - - initialiseStatistics(); - } - - @Override - public String getVirtualHostName() - { - return _virtualHost == null ? null : _virtualHost.getName(); - } - - @Override - public Port getPort() - { - return _port; - } - - public void setPort(Port port) - { - _port = port; - } - - @Override - public Transport getTransport() - { - return _transport; - } - - @Override - public void stop() - { - _stopped = true; - } - - @Override - public boolean isStopped() - { - return _stopped; - } - - public void setTransport(Transport transport) - { - _transport = transport; - } - - public void onOpen(final Runnable task) - { - _onOpenTask = task; - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException - { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try - { - code = ExecutionErrorCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) - { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - ((ServerSession)session).invoke(ex); - - ((ServerSession)session).close(cause, message); - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - @Override - public void received(ProtocolEvent event) - { - _lastIoTime.set(System.currentTimeMillis()); - if (event.isConnectionControl()) - { - CurrentActor.set(_actor); - } - else - { - ServerSession channel = (ServerSession) getSession(event.getChannel()); - LogActor channelActor = null; - - if (channel != null) - { - channelActor = channel.getLogActor(); - } - - CurrentActor.set(channelActor == null ? _actor : channelActor); - } - - try - { - super.received(event); - } - finally - { - CurrentActor.remove(); - } - } - - public String toLogString() - { - boolean hasVirtualHost = (null != this.getVirtualHost()); - boolean hasClientId = (null != getClientId()); - - if (hasClientId && hasVirtualHost) - { - return "[" + - MessageFormat.format(CONNECTION_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString(), - getVirtualHost().getName()) - + "] "; - } - else if (hasClientId) - { - return "[" + - MessageFormat.format(USER_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString()) - + "] "; - - } - else - { - return "[" + - MessageFormat.format(SOCKET_FORMAT, - getConnectionId(), - getRemoteAddressString()) - + "] "; - } - } - - public LogActor getLogActor() - { - return _actor; - } - - public void close(AMQConstant cause, String message) throws AMQException - { - closeSubscriptions(); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) - { - // Ignore - } - close(replyCode, message); - } - - public synchronized void block() - { - if(!_blocking) - { - _blocking = true; - for(AMQSessionModel ssn : getSessionModels()) - { - ssn.block(); - } - } - } - - public synchronized void unblock() - { - if(_blocking) - { - _blocking = false; - for(AMQSessionModel ssn : getSessionModels()) - { - ssn.unblock(); - } - } - } - - @Override - public synchronized void registerSession(final Session ssn) - { - super.registerSession(ssn); - if(_blocking) - { - ((ServerSession)ssn).block(); - } - } - - @Override - public synchronized void removeSession(final Session ssn) - { - super.removeSession(ssn); - } - - public List<AMQSessionModel> getSessionModels() - { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); - for (Session ssn : getChannels()) - { - sessions.add((AMQSessionModel) ssn); - } - return sessions; - } - - public void registerMessageDelivered(long messageSize) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - _virtualHost.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - _virtualHost.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - } - - public void initialiseStatistics() - { - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); - _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); - _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); - } - - /** - * @return authorizedSubject - */ - public Subject getAuthorizedSubject() - { - return _authorizedSubject; - } - - /** - * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject - * and caches it for optimisation purposes. - * - * @param authorizedSubject - */ - public void setAuthorizedSubject(final Subject authorizedSubject) - { - if (authorizedSubject == null) - { - _authorizedSubject = null; - _authorizedPrincipal = null; - } - else - { - _authorizedSubject = authorizedSubject; - _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject); - } - } - - public Principal getAuthorizedPrincipal() - { - return _authorizedPrincipal; - } - - public long getConnectionId() - { - return _connectionId; - } - - public boolean isSessionNameUnique(byte[] name) - { - return !super.hasSessionWithName(name); - } - - public String getRemoteAddressString() - { - return String.valueOf(getRemoteAddress()); - } - - public String getUserName() - { - return _authorizedPrincipal.getName(); - } - - @Override - public void closed() - { - closeSubscriptions(); - super.closed(); - } - - private void closeSubscriptions() - { - for (Session ssn : getChannels()) - { - ((ServerSession)ssn).unregisterSubscriptions(); - } - } - - public void receivedComplete() - { - for (Session ssn : getChannels()) - { - ((ServerSession)ssn).receivedComplete(); - } - } - - @Override - public void send(ProtocolEvent event) - { - _lastIoTime.set(System.currentTimeMillis()); - super.send(event); - } - - public long getLastIoTime() - { - return _lastIoTime.longValue(); - } - - - public String getClientId() - { - return getConnectionDelegate().getClientId(); - } - - public String getClientVersion() - { - return getConnectionDelegate().getClientVersion(); - } - - public String getPrincipalAsString() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - - public long getSessionCountLimit() - { - return getChannelMax(); - } - - public Principal getPeerPrincipal() - { - return _peerPrincipal; - } - - public void setPeerPrincipal(Principal peerPrincipal) - { - _peerPrincipal = peerPrincipal; - } - - @Override - public void setRemoteAddress(SocketAddress remoteAddress) - { - super.setRemoteAddress(remoteAddress); - } - - @Override - public void setLocalAddress(SocketAddress localAddress) - { - super.setLocalAddress(localAddress); - } - - public void setNetworkConnection(NetworkConnection network) - { - _networkConnection = network; - } - - public NetworkConnection getNetworkConnection() - { - return _networkConnection; - } - - public void doHeartbeat() - { - super.doHeartBeat(); - - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java deleted file mode 100644 index 6634627805..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.State; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.NetworkConnection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD; - -public class ServerConnectionDelegate extends ServerDelegate -{ - private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class); - - private final Broker _broker; - private final String _localFQDN; - private int _maxNoOfChannels; - private Map<String,Object> _clientProperties; - private final SubjectCreator _subjectCreator; - - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) - { - this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); - } - - private ServerConnectionDelegate(Map<String, Object> properties, - List<Object> locales, - Broker broker, - String localFQDN, - SubjectCreator subjectCreator) - { - super(properties, parseToList(subjectCreator.getMechanisms()), locales); - - _broker = broker; - _localFQDN = localFQDN; - _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT); - _subjectCreator = subjectCreator; - } - - private static List<String> getFeatures(Broker broker) - { - String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); - final List<String> features = new ArrayList<String>(); - if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR)) - { - features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); - } - - return Collections.unmodifiableList(features); - } - - private static Map<String, Object> createConnectionProperties(final Broker broker) - { - final Map<String,Object> map = new HashMap<String,Object>(); - // Federation tag is used by the client to identify the broker instance - map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString()); - final List<String> features = getFeatures(broker); - if (features != null && features.size() > 0) - { - map.put(ServerPropertyNames.QPID_FEATURES, features); - } - - map.put(ServerPropertyNames.PRODUCT, QpidProperties.getProductName()); - map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion()); - map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion()); - map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName()); - - return map; - } - - private static List<Object> parseToList(String mechanisms) - { - List<Object> list = new ArrayList<Object>(); - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); - while(tokenizer.hasMoreTokens()) - { - list.add(tokenizer.nextToken()); - } - return list; - } - - public ServerSession getSession(Connection conn, SessionAttach atc) - { - SessionDelegate serverSessionDelegate = new ServerSessionDelegate(); - - ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); - - return ssn; - } - - protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException - { - return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal()); - - } - - protected void secure(final SaslServer ss, final Connection conn, final byte[] response) - { - final ServerConnection sconn = (ServerConnection) conn; - final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response); - - if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus())) - { - tuneAuthorizedConnection(sconn); - sconn.setAuthorizedSubject(authResult.getSubject()); - } - else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus())) - { - connectionAuthContinue(sconn, authResult.getChallenge()); - } - else - { - connectionAuthFailed(sconn, authResult.getCause()); - } - } - - @Override - public void connectionClose(Connection conn, ConnectionClose close) - { - final ServerConnection sconn = (ServerConnection) conn; - try - { - sconn.logClosed(); - } - finally - { - sconn.closeCode(close); - sconn.setState(CLOSE_RCVD); - sendConnectionCloseOkAndCloseSender(conn); - } - } - - public void connectionOpen(Connection conn, ConnectionOpen open) - { - final ServerConnection sconn = (ServerConnection) conn; - - VirtualHost vhost; - String vhostName; - if(open.hasVirtualHost()) - { - vhostName = open.getVirtualHost(); - } - else - { - vhostName = ""; - } - vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName); - - SecurityManager.setThreadSubject(sconn.getAuthorizedSubject()); - - if(vhost != null) - { - sconn.setVirtualHost(vhost); - - if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress())) - { - sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); - } - else if (vhost.getState() != State.ACTIVE) - { - sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); - } - else - { - sconn.setState(Connection.State.OPEN); - sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); - } - } - else - { - sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); - } - - } - - @Override - public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok) - { - ServerConnection sconn = (ServerConnection) conn; - int okChannelMax = ok.getChannelMax(); - - if (okChannelMax > getChannelMax()) - { - LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + - "client connectionTuneOk returned a channelMax (" + okChannelMax + - ") above the server's offered limit (" + getChannelMax() +")"); - - //Due to the error we must forcefully close the connection without negotiation - sconn.getSender().close(); - return; - } - - final NetworkConnection networkConnection = sconn.getNetworkConnection(); - - if(ok.hasHeartbeat()) - { - int heartbeat = ok.getHeartbeat(); - if(heartbeat < 0) - { - heartbeat = 0; - } - - networkConnection.setMaxReadIdle(2 * heartbeat); - networkConnection.setMaxWriteIdle(heartbeat); - - } - else - { - networkConnection.setMaxReadIdle(0); - networkConnection.setMaxWriteIdle(0); - } - - setConnectionTuneOkChannelMax(sconn, okChannelMax); - } - - @Override - public int getChannelMax() - { - return _maxNoOfChannels; - } - - protected void setChannelMax(int channelMax) - { - _maxNoOfChannels = channelMax; - } - - @Override public void sessionDetach(Connection conn, SessionDetach dtc) - { - // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures - // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop - // completes. - stopAllSubscriptions(conn, dtc); - Session ssn = conn.getSession(dtc.getChannel()); - ((ServerSession)ssn).setClose(true); - super.sessionDetach(conn, dtc); - } - - private void stopAllSubscriptions(Connection conn, SessionDetach dtc) - { - final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); - final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subs) - { - subscription_0_10.stop(); - } - } - - - @Override - public void sessionAttach(final Connection conn, final SessionAttach atc) - { - final Session ssn; - - if(isSessionNameUnique(atc.getName(), conn)) - { - super.sessionAttach(conn, atc); - } - else - { - ssn = getSession(conn, atc); - ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY)); - ssn.closed(); - } - } - - private boolean isSessionNameUnique(final byte[] name, final Connection conn) - { - final ServerConnection sconn = (ServerConnection) conn; - final String userId = sconn.getUserName(); - - final Iterator<AMQConnectionModel> connections = - ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator(); - while(connections.hasNext()) - { - final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next(); - if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name)) - { - return false; - } - } - return true; - } - - @Override - public void connectionStartOk(Connection conn, ConnectionStartOk ok) - { - _clientProperties = ok.getClientProperties(); - super.connectionStartOk(conn, ok); - } - - public Map<String,Object> getClientProperties() - { - return _clientProperties; - } - - public String getClientId() - { - return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10); - } - - public String getClientVersion() - { - return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java deleted file mode 100644 index abe784cefa..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ /dev/null @@ -1,1000 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import java.security.Principal; -import java.text.MessageFormat; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import javax.security.auth.Subject; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.TransactionTimeoutHelper; -import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.txn.AlreadyKnownDtxException; -import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; -import org.apache.qpid.server.txn.DistributedTransaction; -import org.apache.qpid.server.txn.DtxNotSelectedException; -import org.apache.qpid.server.txn.IncorrectDtxStateException; -import org.apache.qpid.server.txn.JoinAndResumeDtxException; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.NotAssociatedDtxException; -import org.apache.qpid.server.txn.RollbackOnlyDtxException; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.txn.SuspendAndFailDtxException; -import org.apache.qpid.server.txn.TimeoutDtxException; -import org.apache.qpid.server.txn.UnknownDtxBranchException; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; - -public class ServerSession extends Session - implements AuthorizationHolder, - AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder -{ - private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - - private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); - private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; - private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; - - private final UUID _id = UUID.randomUUID(); - private long _createTime = System.currentTimeMillis(); - private LogActor _actor = GenericActor.getInstance(this); - - private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); - - private final AtomicBoolean _blocking = new AtomicBoolean(false); - private ChannelLogSubject _logSubject; - private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - - public static interface MessageDispositionChangeListener - { - public void onAccept(); - - public void onRelease(boolean setRedelivered); - - public void onReject(); - - public boolean acquire(); - - - } - - public static interface Task - { - public void doTask(ServerSession session); - } - - - private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = - new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); - - private ServerTransaction _transaction; - - private final AtomicLong _txnStarts = new AtomicLong(0); - private final AtomicLong _txnCommits = new AtomicLong(0); - private final AtomicLong _txnRejects = new AtomicLong(0); - private final AtomicLong _txnCount = new AtomicLong(0); - - private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); - - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - - private final TransactionTimeoutHelper _transactionTimeoutHelper; - - private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>(); - - public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) - { - super(connection, delegate, name, expiry); - _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); - _logSubject = new ChannelLogSubject(this); - - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() - { - @Override - public void doTimeoutAction(String reason) throws AMQException - { - getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); - } - }); - } - - protected void setState(State state) - { - super.setState(state); - - if (state == State.OPEN) - { - _actor.message(ChannelMessages.CREATE()); - if(_blocking.get()) - { - invokeBlock(); - } - } - } - - private void invokeBlock() - { - invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); - invoke(new MessageStop("")); - } - - @Override - protected boolean isFull(int id) - { - return isCommandsFull(id); - } - - public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) - { - if(_outstandingCredit.get() != UNLIMITED_CREDIT - && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) - { - _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); - invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); - } - getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction); - incrementOutstandingTxnsIfNecessary(); - } - - - public void sendMessage(MessageTransfer xfr, - Runnable postIdSettingAction) - { - getConnectionModel().registerMessageDelivered(xfr.getBodySize()); - invoke(xfr, postIdSettingAction); - } - - public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) - { - _messageDispositionListenerMap.put(xfr.getId(), acceptListener); - } - - - private static interface MessageDispositionAction - { - void performAction(MessageDispositionChangeListener listener); - } - - public void accept(RangeSet ranges) - { - dispositionChange(ranges, new MessageDispositionAction() - { - public void performAction(MessageDispositionChangeListener listener) - { - listener.onAccept(); - } - }); - } - - - public void release(RangeSet ranges, final boolean setRedelivered) - { - dispositionChange(ranges, new MessageDispositionAction() - { - public void performAction(MessageDispositionChangeListener listener) - { - listener.onRelease(setRedelivered); - } - }); - } - - public void reject(RangeSet ranges) - { - dispositionChange(ranges, new MessageDispositionAction() - { - public void performAction(MessageDispositionChangeListener listener) - { - listener.onReject(); - } - }); - } - - public RangeSet acquire(RangeSet transfers) - { - RangeSet acquired = RangeSetFactory.createRangeSet(); - - if(!_messageDispositionListenerMap.isEmpty()) - { - Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); - Iterator<Range> rangeIter = transfers.iterator(); - - if(rangeIter.hasNext()) - { - Range range = rangeIter.next(); - - while(range != null && unacceptedMessages.hasNext()) - { - int next = unacceptedMessages.next(); - while(gt(next, range.getUpper())) - { - if(rangeIter.hasNext()) - { - range = rangeIter.next(); - } - else - { - range = null; - break; - } - } - if(range != null && range.includes(next)) - { - MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next); - if(changeListener != null && changeListener.acquire()) - { - acquired.add(next); - } - } - - - } - - } - - - } - - return acquired; - } - - public void dispositionChange(RangeSet ranges, MessageDispositionAction action) - { - if(ranges != null) - { - - if(ranges.size() == 1) - { - Range r = ranges.getFirst(); - for(int i = r.getLower(); i <= r.getUpper(); i++) - { - MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i); - if(changeListener != null) - { - action.performAction(changeListener); - } - } - } - else if(!_messageDispositionListenerMap.isEmpty()) - { - Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); - Iterator<Range> rangeIter = ranges.iterator(); - - if(rangeIter.hasNext()) - { - Range range = rangeIter.next(); - - while(range != null && unacceptedMessages.hasNext()) - { - int next = unacceptedMessages.next(); - while(gt(next, range.getUpper())) - { - if(rangeIter.hasNext()) - { - range = rangeIter.next(); - } - else - { - range = null; - break; - } - } - if(range != null && range.includes(next)) - { - MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); - action.performAction(changeListener); - } - - - } - - } - } - } - } - - public void removeDispositionListener(Method method) - { - _messageDispositionListenerMap.remove(method.getId()); - } - - public void onClose() - { - if(_transaction instanceof LocalTransaction) - { - _transaction.rollback(); - } - else if(_transaction instanceof DistributedTransaction) - { - getVirtualHost().getDtxRegistry().endAssociations(this); - } - - for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) - { - listener.onRelease(true); - } - _messageDispositionListenerMap.clear(); - - for (Task task : _taskList) - { - task.doTask(this); - } - - LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); - if (operationalLoggingMessage == null) - { - operationalLoggingMessage = ChannelMessages.CLOSE(); - } - CurrentActor.get().message(getLogSubject(), operationalLoggingMessage); - } - - @Override - protected void awaitClose() - { - // Broker shouldn't block awaiting close - thus do override this method to do nothing - } - - public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) - { - _transaction.dequeue(entry.getQueue(), entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - sub.acknowledge(entry); - } - - public void onRollback() - { - // The client has acknowledge the message and therefore have seen it. - // In the event of rollback, the message must be marked as redelivered. - entry.setRedelivered(); - entry.release(); - } - }); - } - - public Collection<Subscription_0_10> getSubscriptions() - { - return _subscriptions.values(); - } - - public void register(String destination, Subscription_0_10 sub) - { - _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub); - } - - public Subscription_0_10 getSubscription(String destination) - { - return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination); - } - - public void unregister(Subscription_0_10 sub) - { - _subscriptions.remove(sub.getName()); - try - { - sub.getSendLock(); - AMQQueue queue = sub.getQueue(); - if(queue != null) - { - queue.unregisterSubscription(sub); - } - } - catch (AMQException e) - { - // TODO - _logger.error("Failed to unregister subscription :" + e.getMessage(), e); - } - finally - { - sub.releaseSendLock(); - } - } - - public boolean isTransactional() - { - return _transaction.isTransactional(); - } - - public void selectTx() - { - _transaction = new LocalTransaction(this.getMessageStore()); - _txnStarts.incrementAndGet(); - } - - public void selectDtx() - { - _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); - - } - - - public void startDtx(Xid xid, boolean join, boolean resume) - throws JoinAndResumeDtxException, - UnknownDtxBranchException, - AlreadyKnownDtxException, - DtxNotSelectedException - { - DistributedTransaction distributedTransaction = assertDtxTransaction(); - distributedTransaction.start(xid, join, resume); - } - - - public void endDtx(Xid xid, boolean fail, boolean suspend) - throws NotAssociatedDtxException, - UnknownDtxBranchException, - DtxNotSelectedException, - SuspendAndFailDtxException, TimeoutDtxException - { - DistributedTransaction distributedTransaction = assertDtxTransaction(); - distributedTransaction.end(xid, fail, suspend); - } - - - public long getTimeoutDtx(Xid xid) - throws UnknownDtxBranchException - { - return getVirtualHost().getDtxRegistry().getTimeout(xid); - } - - - public void setTimeoutDtx(Xid xid, long timeout) - throws UnknownDtxBranchException - { - getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); - } - - - public void prepareDtx(Xid xid) - throws UnknownDtxBranchException, - IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException - { - getVirtualHost().getDtxRegistry().prepare(xid); - } - - public void commitDtx(Xid xid, boolean onePhase) - throws UnknownDtxBranchException, - IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException - { - getVirtualHost().getDtxRegistry().commit(xid, onePhase); - } - - - public void rollbackDtx(Xid xid) - throws UnknownDtxBranchException, - IncorrectDtxStateException, AMQStoreException, TimeoutDtxException - { - getVirtualHost().getDtxRegistry().rollback(xid); - } - - - public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException - { - getVirtualHost().getDtxRegistry().forget(xid); - } - - public List<Xid> recoverDtx() - { - return getVirtualHost().getDtxRegistry().recover(); - } - - private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException - { - if(_transaction instanceof DistributedTransaction) - { - return (DistributedTransaction) _transaction; - } - else - { - throw new DtxNotSelectedException(); - } - } - - - public void commit() - { - _transaction.commit(); - - _txnCommits.incrementAndGet(); - _txnStarts.incrementAndGet(); - decrementOutstandingTxnsIfNecessary(); - } - - public void rollback() - { - _transaction.rollback(); - - _txnRejects.incrementAndGet(); - _txnStarts.incrementAndGet(); - decrementOutstandingTxnsIfNecessary(); - } - - - private void incrementOutstandingTxnsIfNecessary() - { - if(isTransactional()) - { - //There can currently only be at most one outstanding transaction - //due to only having LocalTransaction support. Set value to 1 if 0. - _txnCount.compareAndSet(0,1); - } - } - - private void decrementOutstandingTxnsIfNecessary() - { - if(isTransactional()) - { - //There can currently only be at most one outstanding transaction - //due to only having LocalTransaction support. Set value to 0 if 1. - _txnCount.compareAndSet(1,0); - } - } - - public Long getTxnCommits() - { - return _txnCommits.get(); - } - - public Long getTxnRejects() - { - return _txnRejects.get(); - } - - public int getChannelId() - { - return getChannel(); - } - - public Long getTxnCount() - { - return _txnCount.get(); - } - - public Long getTxnStart() - { - return _txnStarts.get(); - } - - public Principal getAuthorizedPrincipal() - { - return getConnection().getAuthorizedPrincipal(); - } - - public Subject getAuthorizedSubject() - { - return getConnection().getAuthorizedSubject(); - } - - public void addSessionCloseTask(Task task) - { - _taskList.add(task); - } - - public void removeSessionCloseTask(Task task) - { - _taskList.remove(task); - } - - public Object getReference() - { - return getConnection().getReference(); - } - - public MessageStore getMessageStore() - { - return getVirtualHost().getMessageStore(); - } - - public VirtualHost getVirtualHost() - { - return getConnection().getVirtualHost(); - } - - public boolean isDurable() - { - return false; - } - - - public long getCreateTime() - { - return _createTime; - } - - @Override - public UUID getId() - { - return _id; - } - - public AMQConnectionModel getConnectionModel() - { - return getConnection(); - } - - public String getClientID() - { - return getConnection().getClientId(); - } - - @Override - public ServerConnection getConnection() - { - return (ServerConnection) super.getConnection(); - } - - public LogActor getLogActor() - { - return _actor; - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); - } - - public void block(AMQQueue queue) - { - block(queue, queue.getName()); - } - - public void block() - { - block(this, "** All Queues **"); - } - - - private void block(Object queue, String name) - { - synchronized (_blockingEntities) - { - if(_blockingEntities.add(queue)) - { - - if(_blocking.compareAndSet(false,true)) - { - if(getState() == State.OPEN) - { - invokeBlock(); - } - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); - } - - - } - } - } - - public void unblock(AMQQueue queue) - { - unblock((Object)queue); - } - - public void unblock() - { - unblock(this); - } - - private void unblock(Object queue) - { - synchronized(_blockingEntities) - { - if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) - { - if(_blocking.compareAndSet(true,false) && !isClosing()) - { - - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); - - - } - } - } - } - - public boolean onSameConnection(InboundMessage inbound) - { - return ((inbound instanceof MessageTransferMessage) - && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) - || ((inbound instanceof MessageMetaData_0_10) - && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); - } - - - public String toLogString() - { - long connectionId = super.getConnection() instanceof ServerConnection - ? getConnection().getConnectionId() - : -1; - - String remoteAddress = String.valueOf(getConnection().getRemoteAddress()); - return "[" + - MessageFormat.format(CHANNEL_FORMAT, - connectionId, - getClientID(), - remoteAddress, - getVirtualHost().getName(), - getChannel()) - + "] "; - } - - @Override - public void close(AMQConstant cause, String message) - { - if (cause == null) - { - close(); - } - else - { - close(cause.getCode(), message); - } - } - - void close(int cause, String message) - { - _forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message)); - close(); - } - - @Override - public void close() - { - // unregister subscriptions in order to prevent sending of new messages - // to subscriptions with closing session - unregisterSubscriptions(); - super.close(); - } - - void unregisterSubscriptions() - { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) - { - unregister(subscription_0_10); - } - } - - void stopSubscriptions() - { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) - { - subscription_0_10.stop(); - } - } - - - public void receivedComplete() - { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) - { - subscription_0_10.flushCreditState(false); - } - awaitCommandCompletion(); - } - - private class PostEnqueueAction implements ServerTransaction.Action - { - - private List<? extends BaseQueue> _queues; - private ServerMessage _message; - private final boolean _transactional; - - public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) - { - _transactional = transactional; - setState(queues, message); - } - - public void setState(List<? extends BaseQueue> queues, ServerMessage message) - { - _message = message; - _queues = queues; - } - - public void postCommit() - { - MessageReference<?> ref = _message.newReference(); - for(int i = 0; i < _queues.size(); i++) - { - try - { - BaseQueue queue = _queues.get(i); - queue.enqueue(_message, _transactional, null); - if(queue instanceof AMQQueue) - { - ((AMQQueue)queue).checkCapacity(ServerSession.this); - } - - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - ref.release(); - } - - public void onRollback() - { - // NO-OP - } - } - - public int getUnacknowledgedMessageCount() - { - return _messageDispositionListenerMap.size(); - } - - public boolean getBlocking() - { - return _blocking.get(); - } - - private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); - - public void completeAsyncCommands() - { - AsyncCommand cmd; - while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) - { - cmd.complete(); - _unfinishedCommandsQueue.poll(); - } - while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) - { - cmd = _unfinishedCommandsQueue.poll(); - cmd.awaitReadyForCompletion(); - cmd.complete(); - } - } - - - public void awaitCommandCompletion() - { - AsyncCommand cmd; - while((cmd = _unfinishedCommandsQueue.poll()) != null) - { - cmd.awaitReadyForCompletion(); - cmd.complete(); - } - } - - - public Object getAsyncCommandMark() - { - return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); - } - - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) - { - _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); - } - - private static class AsyncCommand - { - private final StoreFuture _future; - private ServerTransaction.Action _action; - - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) - { - _future = future; - _action = action; - } - - void awaitReadyForCompletion() - { - _future.waitForCompletion(); - } - - void complete() - { - if(!_future.isComplete()) - { - _future.waitForCompletion(); - } - _action.postCommit(); - _action = null; - } - - boolean isReadyForCompletion() - { - return _future.isComplete(); - } - } - - protected void setClose(boolean close) - { - super.setClose(close); - } - - @Override - public int getConsumerCount() - { - return _subscriptions.values().size(); - } - - @Override - public int compareTo(AMQSessionModel o) - { - return getId().compareTo(o.getId()); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java deleted file mode 100644 index 8e79813216..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ /dev/null @@ -1,1582 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.exchange.HeadersExchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.logging.messages.ExchangeMessages; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.txn.AlreadyKnownDtxException; -import org.apache.qpid.server.txn.DtxNotSelectedException; -import org.apache.qpid.server.txn.IncorrectDtxStateException; -import org.apache.qpid.server.txn.JoinAndResumeDtxException; -import org.apache.qpid.server.txn.NotAssociatedDtxException; -import org.apache.qpid.server.txn.RollbackOnlyDtxException; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.txn.SuspendAndFailDtxException; -import org.apache.qpid.server.txn.TimeoutDtxException; -import org.apache.qpid.server.txn.UnknownDtxBranchException; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.*; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class ServerSessionDelegate extends SessionDelegate -{ - private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); - - /** - * No-local queue argument is used to support the no-local feature of Durable Subscribers. - */ - private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local"; - - public ServerSessionDelegate() - { - - } - - @Override - public void command(Session session, Method method) - { - try - { - setThreadSubject(session); - - if(!session.isClosing()) - { - Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark(); - super.command(session, method, false); - Object newOutstanding = ((ServerSession)session).getAsyncCommandMark(); - if(newOutstanding == null || newOutstanding == asyncCommandMark) - { - session.processed(method); - } - - if(newOutstanding != null) - { - ((ServerSession)session).completeAsyncCommands(); - } - - if (method.isSync()) - { - ((ServerSession)session).awaitCommandCompletion(); - session.flushProcessed(); - } - } - } - catch(RuntimeException e) - { - LOGGER.error("Exception processing command", e); - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e); - } - } - - @Override - public void messageAccept(Session session, MessageAccept method) - { - final ServerSession serverSession = (ServerSession) session; - serverSession.accept(method.getTransfers()); - if(!serverSession.isTransactional()) - { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, - new CommandProcessedAction(serverSession, method)); - } - } - - @Override - public void messageReject(Session session, MessageReject method) - { - ((ServerSession)session).reject(method.getTransfers()); - } - - @Override - public void messageRelease(Session session, MessageRelease method) - { - ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered()); - } - - @Override - public void messageAcquire(Session session, MessageAcquire method) - { - RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers()); - - Acquired result = new Acquired(acquiredRanges); - - - session.executionResult((int) method.getId(), result); - - - } - - @Override - public void messageResume(Session session, MessageResume method) - { - super.messageResume(session, method); - } - - @Override - public void messageSubscribe(Session session, MessageSubscribe method) - { - /* - TODO - work around broken Python tests - Correct code should read like - if not hasAcceptMode() exception ILLEGAL_ARGUMENT "Accept-mode not supplied" - else if not method.hasAcquireMode() exception ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied" - */ - if(!method.hasAcceptMode()) - { - method.setAcceptMode(MessageAcceptMode.EXPLICIT); - } - if(!method.hasAcquireMode()) - { - method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED); - - } - - if(!method.hasQueue()) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); - } - else - { - String destination = method.getDestination(); - - if(((ServerSession)session).getSubscription(destination)!=null) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'"); - } - else - { - String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); - - - final AMQQueue queue = queueRegistry.getQueue(queueName); - - if(queue == null) - { - exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); - } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else - { - if(queue.isExclusive()) - { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - if(queue.getAuthorizationHolder() == null) - { - queue.setAuthorizationHolder(s); - queue.setExclusiveOwningSession(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() - { - public void doTask(ServerSession session) - { - if(queue.getAuthorizationHolder() == session) - { - queue.setAuthorizationHolder(null); - queue.setExclusiveOwningSession(null); - } - } - }); - } - } - - FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); - - FilterManager filterManager = null; - try - { - filterManager = FilterManagerFactory.createManager(method.getArguments()); - } - catch (AMQException amqe) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager"); - return; - } - - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, - destination, - method.getAcceptMode(), - method.getAcquireMode(), - MessageFlowMode.WINDOW, - creditManager, - filterManager, - method.getArguments()); - - ((ServerSession)session).register(destination, sub); - try - { - queue.registerSubscription(sub, method.getExclusive()); - } - catch (AMQQueue.ExistingExclusiveSubscription existing) - { - exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); - } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) - { - exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot subscribe to queue '" + queueName + "' with destination '" + destination); - } - } - } - } - } - - @Override - public void messageTransfer(Session ssn, final MessageTransfer xfr) - { - final Exchange exchange = getExchangeForMessage(ssn, xfr); - - DeliveryProperties delvProps = null; - if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps - .hasExpiration()) - { - delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); - } - - final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); - messageMetaData.setConnectionReference(((ServerSession)ssn).getReference()); - - if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; - String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; - exception(ssn, xfr, errorCode, description); - - return; - } - - final Exchange exchangeInUse; - List<? extends BaseQueue> queues = exchange.route(messageMetaData); - if(queues.isEmpty() && exchange.getAlternateExchange() != null) - { - final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(messageMetaData); - if (!queues.isEmpty()) - { - exchangeInUse = alternateExchange; - } - else - { - exchangeInUse = exchange; - } - } - else - { - exchangeInUse = exchange; - } - - final ServerSession serverSession = (ServerSession) ssn; - if(!queues.isEmpty()) - { - final MessageStore store = getVirtualHost(ssn).getMessageStore(); - final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); - serverSession.enqueue(message, queues); - storeMessage.flushToStore(); - } - else - { - if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) - { - RangeSet rejects = RangeSetFactory.createRangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); - } - else - { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); - } - } - - - if(serverSession.isTransactional()) - { - serverSession.processed(xfr); - } - else - { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); - } - } - - private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, - final MessageMetaData_0_10 messageMetaData, final MessageStore store) - { - final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); - ByteBuffer body = xfr.getBody(); - if(body != null) - { - storeMessage.addContent(0, body); - } - return storeMessage; - } - - @Override - public void messageCancel(Session session, MessageCancel method) - { - String destination = method.getDestination(); - - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - - if(sub == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); - } - else - { - AMQQueue queue = sub.getQueue(); - ((ServerSession)session).unregister(sub); - if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) - { - queue.setAuthorizationHolder(null); - } - } - } - - @Override - public void messageFlush(Session session, MessageFlush method) - { - String destination = method.getDestination(); - - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - - if(sub == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); - } - else - { - - try - { - sub.flush(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot flush subscription '" + destination); - } - } - } - - @Override - public void txSelect(Session session, TxSelect method) - { - // TODO - check current tx mode - ((ServerSession)session).selectTx(); - } - - @Override - public void txCommit(Session session, TxCommit method) - { - // TODO - check current tx mode - ((ServerSession)session).commit(); - } - - @Override - public void txRollback(Session session, TxRollback method) - { - // TODO - check current tx mode - ((ServerSession)session).rollback(); - } - - @Override - public void dtxSelect(Session session, DtxSelect method) - { - // TODO - check current tx mode - ((ServerSession)session).selectDtx(); - } - - @Override - public void dtxStart(Session session, DtxStart method) - { - XaResult result = new XaResult(); - result.setStatus(DtxXaStatus.XA_OK); - try - { - ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume()); - session.executionResult(method.getId(), result); - } - catch(JoinAndResumeDtxException e) - { - exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Unknown xid " + method.getXid()); - } - catch(AlreadyKnownDtxException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Xid already started an neither join nor " + - "resume set" + method.getXid()); - } - catch(DtxNotSelectedException e) - { - exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); - } - - } - - @Override - public void dtxEnd(Session session, DtxEnd method) - { - XaResult result = new XaResult(); - result.setStatus(DtxXaStatus.XA_OK); - try - { - try - { - ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); - } - catch (TimeoutDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBTIMEOUT); - } - session.executionResult(method.getId(), result); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(NotAssociatedDtxException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(DtxNotSelectedException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(SuspendAndFailDtxException e) - { - exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); - } - - } - - @Override - public void dtxCommit(Session session, DtxCommit method) - { - XaResult result = new XaResult(); - result.setStatus(DtxXaStatus.XA_OK); - try - { - try - { - ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase()); - } - catch (RollbackOnlyDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBROLLBACK); - } - catch (TimeoutDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBTIMEOUT); - } - session.executionResult(method.getId(), result); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - catch(IncorrectDtxStateException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(AMQStoreException e) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - - @Override - public void dtxForget(Session session, DtxForget method) - { - try - { - ((ServerSession)session).forgetDtx(method.getXid()); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - catch(IncorrectDtxStateException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - - } - - @Override - public void dtxGetTimeout(Session session, DtxGetTimeout method) - { - GetTimeoutResult result = new GetTimeoutResult(); - try - { - result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid())); - session.executionResult(method.getId(), result); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - } - - @Override - public void dtxPrepare(Session session, DtxPrepare method) - { - XaResult result = new XaResult(); - result.setStatus(DtxXaStatus.XA_OK); - try - { - try - { - ((ServerSession)session).prepareDtx(method.getXid()); - } - catch (RollbackOnlyDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBROLLBACK); - } - catch (TimeoutDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBTIMEOUT); - } - session.executionResult((int) method.getId(), result); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - catch(IncorrectDtxStateException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(AMQStoreException e) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - - @Override - public void dtxRecover(Session session, DtxRecover method) - { - RecoverResult result = new RecoverResult(); - List inDoubt = ((ServerSession)session).recoverDtx(); - result.setInDoubt(inDoubt); - session.executionResult(method.getId(), result); - } - - @Override - public void dtxRollback(Session session, DtxRollback method) - { - - XaResult result = new XaResult(); - result.setStatus(DtxXaStatus.XA_OK); - try - { - try - { - ((ServerSession)session).rollbackDtx(method.getXid()); - } - catch (TimeoutDtxException e) - { - result.setStatus(DtxXaStatus.XA_RBTIMEOUT); - } - session.executionResult(method.getId(), result); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - catch(IncorrectDtxStateException e) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); - } - catch(AMQStoreException e) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - - @Override - public void dtxSetTimeout(Session session, DtxSetTimeout method) - { - try - { - ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout()); - } - catch(UnknownDtxBranchException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); - } - } - - @Override - public void executionSync(final Session ssn, final ExecutionSync sync) - { - ((ServerSession)ssn).awaitCommandCompletion(); - super.executionSync(ssn, sync); - } - - @Override - public void exchangeDeclare(Session session, ExchangeDeclare method) - { - String exchangeName = method.getExchange(); - VirtualHost virtualHost = getVirtualHost(session); - - //we must check for any unsupported arguments present and throw not-implemented - if(method.hasArguments()) - { - Map<String,Object> args = method.getArguments(); - //QPID-3392: currently we don't support any! - if(!args.isEmpty()) - { - exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString()); - return; - } - } - - if(method.getPassive()) - { - Exchange exchange = getExchange(session, exchangeName); - - if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); - } - else - { - if (!exchange.getTypeShortString().toString().equals(method.getType()) - && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + "."); - } - } - } - else - { - - try - { - virtualHost.createExchange(null, - method.getExchange(), - method.getType(), - method.getDurable(), - method.getAutoDelete(), - method.getAlternateExchange()); - } - catch(ReservedExchangeNameException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " - + exchangeName + " which begins with reserved name or prefix."); - } - catch(UnknownExchangeException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + e.getExchangeName()); - } - catch(AMQUnknownExchangeType e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); - } - catch(ExchangeExistsException e) - { - Exchange exchange = e.getExistingExchange(); - if(!exchange.getTypeShortString().toString().equals(method.getType())) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeShortString() - + " to " + method.getType() +"."); - } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); - } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare exchange '" + exchangeName); - } - - - } - - } - - // TODO decouple AMQException and AMQConstant error codes - private void exception(Session session, Method method, AMQException exception, String message) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; - if (exception.getErrorCode() != null) - { - try - { - errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode()); - } - catch (IllegalArgumentException iae) - { - // ignore, already set to INTERNAL_ERROR - } - } - String description = message + "': " + exception.getMessage(); - - exception(session, method, errorCode, description); - } - - private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) - { - ExecutionException ex = new ExecutionException(); - ex.setErrorCode(errorCode); - ex.setCommandId(method.getId()); - ex.setDescription(description); - - session.invoke(ex); - - ((ServerSession)session).close(errorCode.getValue(), description); - } - - private Exchange getExchange(Session session, String exchangeName) - { - return getVirtualHost(session).getExchange(exchangeName); - } - - private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) - { - VirtualHost virtualHost = getVirtualHost(ssn); - - Exchange exchange; - if(xfr.hasDestination()) - { - exchange = virtualHost.getExchange(xfr.getDestination()); - if(exchange == null) - { - exchange = virtualHost.getDefaultExchange(); - } - } - else - { - exchange = virtualHost.getDefaultExchange(); - } - return exchange; - } - - private VirtualHost getVirtualHost(Session session) - { - ServerConnection conn = getServerConnection(session); - VirtualHost vhost = conn.getVirtualHost(); - return vhost; - } - - private ServerConnection getServerConnection(Session session) - { - ServerConnection conn = (ServerConnection) session.getConnection(); - return conn; - } - - @Override - public void exchangeDelete(Session session, ExchangeDelete method) - { - VirtualHost virtualHost = getVirtualHost(session); - - try - { - if (nameNullOrEmpty(method.getExchange())) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange"); - return; - } - - Exchange exchange = getExchange(session, method.getExchange()); - - if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'"); - } - else - { - virtualHost.removeExchange(exchange, !method.getIfUnused()); - } - } - catch (ExchangeInUseException e) - { - exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); - } - catch (ExchangeIsAlternateException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); - } - catch (RequiredExchangeException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete exchange '" + method.getExchange() ); - } - } - - private boolean nameNullOrEmpty(String name) - { - if(name == null || name.length() == 0) - { - return true; - } - - return false; - } - - private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes) - { - for(ExchangeType type : registeredTypes) - { - if(type.getDefaultExchangeName().toString().equals( exchange.getName() )) - { - return true; - } - } - return false; - } - - @Override - public void exchangeQuery(Session session, ExchangeQuery method) - { - - ExchangeQueryResult result = new ExchangeQueryResult(); - - Exchange exchange = getExchange(session, method.getName()); - - if(exchange != null) - { - result.setDurable(exchange.isDurable()); - result.setType(exchange.getTypeShortString().toString()); - result.setNotFound(false); - } - else - { - result.setNotFound(true); - } - - session.executionResult((int) method.getId(), result); - } - - @Override - public void exchangeBind(Session session, ExchangeBind method) - { - - VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - if (!method.hasQueue()) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); - } - else if (nameNullOrEmpty(method.getExchange())) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); - } - else - { - //TODO - here because of non-compiant python tests - // should raise exception ILLEGAL_ARGUMENT "binding-key not set" - if (!method.hasBindingKey()) - { - method.setBindingKey(method.getQueue()); - } - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); - Exchange exchange = virtualHost.getExchange(method.getExchange()); - if(queue == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); - } - else if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); - } - else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); - } - else - { - if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)) - { - try - { - exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot add binding '" + method.getBindingKey()); - } - } - else - { - // todo - } - } - - - } - - - - } - - @Override - public void exchangeUnbind(Session session, ExchangeUnbind method) - { - VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - if (!method.hasQueue()) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); - } - else if (nameNullOrEmpty(method.getExchange())) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange"); - } - else if (!method.hasBindingKey()) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); - } - else - { - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); - Exchange exchange = virtualHost.getExchange(method.getExchange()); - if(queue == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); - } - else if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); - } - else - { - try - { - exchange.removeBinding(method.getBindingKey(), queue, null); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot remove binding '" + method.getBindingKey()); - } - } - } - } - - @Override - public void exchangeBound(Session session, ExchangeBound method) - { - - ExchangeBoundResult result = new ExchangeBoundResult(); - VirtualHost virtualHost = getVirtualHost(session); - Exchange exchange; - AMQQueue queue; - if(method.hasExchange()) - { - exchange = virtualHost.getExchange(method.getExchange()); - - if(exchange == null) - { - result.setExchangeNotFound(true); - } - } - else - { - exchange = virtualHost.getDefaultExchange(); - } - - - if(method.hasQueue()) - { - - queue = getQueue(session, method.getQueue()); - if(queue == null) - { - result.setQueueNotFound(true); - } - - - if(exchange != null && queue != null) - { - - boolean queueMatched = exchange.isBound(queue); - - result.setQueueNotMatched(!queueMatched); - - - if(method.hasBindingKey()) - { - - if(queueMatched) - { - final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue); - result.setKeyNotMatched(!keyMatched); - if(method.hasArguments()) - { - if(keyMatched) - { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)); - } - else - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); - } - } - } - else - { - boolean keyMatched = exchange.isBound(method.getBindingKey()); - result.setKeyNotMatched(!keyMatched); - if(method.hasArguments()) - { - if(keyMatched) - { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); - } - else - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments())); - } - } - } - - } - else if (method.hasArguments()) - { - if(queueMatched) - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); - } - else - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments())); - } - } - - } - else if(exchange != null && method.hasBindingKey()) - { - final boolean keyMatched = exchange.isBound(method.getBindingKey()); - result.setKeyNotMatched(!keyMatched); - - if(method.hasArguments()) - { - if(keyMatched) - { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); - } - else - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments())); - } - } - - - } - - } - else if(exchange != null && method.hasBindingKey()) - { - final boolean keyMatched = exchange.isBound(method.getBindingKey()); - result.setKeyNotMatched(!keyMatched); - - if(method.hasArguments()) - { - if(keyMatched) - { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); - } - else - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments())); - } - } - - } - else if(exchange != null && method.hasArguments()) - { - result.setArgsNotMatched(!exchange.isBound(method.getArguments())); - } - - - session.executionResult((int) method.getId(), result); - - - } - - private AMQQueue getQueue(Session session, String queue) - { - QueueRegistry queueRegistry = getQueueRegistry(session); - return queueRegistry.getQueue(queue); - } - - private QueueRegistry getQueueRegistry(Session session) - { - return getVirtualHost(session).getQueueRegistry(); - } - - @Override - public void queueDeclare(Session session, final QueueDeclare method) - { - - VirtualHost virtualHost = getVirtualHost(session); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - - String queueName = method.getQueue(); - AMQQueue queue; - QueueRegistry queueRegistry = getQueueRegistry(session); - //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - - synchronized (queueRegistry) - { - - if (((queue = queueRegistry.getQueue(queueName)) == null)) - { - - if (method.getPassive()) - { - String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; - - exception(session, method, errorCode, description); - - return; - } - else - { - try - { - queue = createQueue(queueName, method, virtualHost, (ServerSession)session); - if(!method.getExclusive() && method.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } - - final String alternateExchangeName = method.getAlternateExchange(); - if(alternateExchangeName != null && alternateExchangeName.length() != 0) - { - Exchange alternate = getExchange(session, alternateExchangeName); - queue.setAlternateExchange(alternate); - } - - if(method.hasArguments() && method.getArguments() != null) - { - if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL)) - { - Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL); - queue.setNoLocal(convertBooleanValue(noLocal)); - } - } - - - if (queue.isDurable() && !queue.isAutoDelete()) - { - if(method.hasArguments() && method.getArguments() != null) - { - Map<String,Object> args = method.getArguments(); - FieldTable ftArgs = new FieldTable(); - for(Map.Entry<String, Object> entry : args.entrySet()) - { - ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); - } - DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs); - } - else - { - DurableConfigurationStoreHelper.createQueue(store, queue, null); - } - } - queueRegistry.registerQueue(queue); - - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) - { - final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - try - { - q.delete(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete '" + method.getQueue()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); - } - if (method.hasExclusive() - && method.getExclusive()) - { - final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(removeExclusive); - } - }); - } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare queue '" + queueName); - } - } - } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) - { - String description = "Cannot declare queue('" + queueName + "')," - + " as exclusive queue with same name " - + "declared on another session"; - ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - - exception(session, method, errorCode, description); - - return; - } - } - } - - /** - * Converts a queue argument into a boolean value. For compatibility with the C++ - * and the clients, accepts with Boolean, String, or Number types. - * @param argValue argument value. - * - * @return true if set - */ - private boolean convertBooleanValue(Object argValue) - { - if(argValue instanceof Boolean && ((Boolean)argValue)) - { - return true; - } - else if (argValue instanceof String && Boolean.parseBoolean((String)argValue)) - { - return true; - } - else if (argValue instanceof Number && ((Number)argValue).intValue() != 0) - { - return true; - } - return false; - } - - protected AMQQueue createQueue(final String queueName, - final QueueDeclare body, - VirtualHost virtualHost, - final ServerSession session) - throws AMQException - { - String owner = body.getExclusive() ? session.getClientID() : null; - - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner, - body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - - return queue; - } - - @Override - public void queueDelete(Session session, QueueDelete method) - { - String queueName = method.getQueue(); - if(queueName == null || queueName.length()==0) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); - - } - else - { - AMQQueue queue = getQueue(session, queueName); - - - if (queue == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); - } - else - { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if (method.getIfEmpty() && !queue.isEmpty()) - { - exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty"); - } - else if (method.getIfUnused() && !queue.isUnused()) - { - // TODO - Error code - exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use"); - - } - else - { - VirtualHost virtualHost = getVirtualHost(session); - - try - { - queue.delete(); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store,queue); - } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete queue '" + queueName); - } - } - } - } - } - - @Override - public void queuePurge(Session session, QueuePurge method) - { - String queueName = method.getQueue(); - if(queueName == null || queueName.length()==0) - { - exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied"); - } - else - { - AMQQueue queue = getQueue(session, queueName); - - if (queue == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); - } - else - { - try - { - queue.clearQueue(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot purge queue '" + queueName); - } - } - } - } - - @Override - public void queueQuery(Session session, QueueQuery method) - { - QueueQueryResult result = new QueueQueryResult(); - - AMQQueue queue = getQueue(session, method.getQueue()); - - if(queue != null) - { - result.setQueue(queue.getNameShortString().toString()); - result.setDurable(queue.isDurable()); - result.setExclusive(queue.isExclusive()); - result.setAutoDelete(queue.isAutoDelete()); - result.setArguments(queue.getArguments()); - result.setMessageCount(queue.getMessageCount()); - result.setSubscriberCount(queue.getConsumerCount()); - - } - - - session.executionResult((int) method.getId(), result); - - } - - @Override - public void messageSetFlowMode(Session session, MessageSetFlowMode sfm) - { - String destination = sfm.getDestination(); - - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - - if(sub == null) - { - exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); - } - else if(sub.isStopped()) - { - sub.setFlowMode(sfm.getFlowMode()); - } - } - - @Override - public void messageStop(Session session, MessageStop stop) - { - String destination = stop.getDestination(); - - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - - if(sub == null) - { - exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); - } - else - { - sub.stop(); - } - - } - - @Override - public void messageFlow(Session session, MessageFlow flow) - { - String destination = flow.getDestination(); - - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - - if(sub == null) - { - exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); - } - else - { - sub.addCredit(flow.getUnit(), flow.getValue()); - } - - } - - @Override - public void closed(Session session) - { - setThreadSubject(session); - - ServerSession serverSession = (ServerSession)session; - - serverSession.stopSubscriptions(); - serverSession.onClose(); - serverSession.unregisterSubscriptions(); - } - - @Override - public void detached(Session session) - { - closed(session); - } - - private void setThreadSubject(Session session) - { - final ServerConnection scon = (ServerConnection) session.getConnection(); - SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); - } - - private static class CommandProcessedAction implements ServerTransaction.Action - { - private final ServerSession _serverSession; - private final Method _method; - - public CommandProcessedAction(final ServerSession serverSession, final Method xfr) - { - _serverSession = serverSession; - _method = xfr; - } - - public void postCommit() - { - _serverSession.processed(_method); - } - - public void onRollback() - { - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java deleted file mode 100644 index c6bceb6ac7..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ /dev/null @@ -1,952 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; - -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject -{ - private final long _subscriptionID; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private static final Option[] BATCHED = new Option[] { Option.BATCH }; - - private final Lock _stateChangeLock = new ReentrantLock(); - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - private FlowCreditManager_0_10 _creditManager; - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private AMQQueue _queue; - private final String _destination; - private boolean _noLocal; - private final FilterManager _filters; - private final MessageAcceptMode _acceptMode; - private final MessageAcquireMode _acquireMode; - private MessageFlowMode _flowMode; - private final ServerSession _session; - private final AtomicBoolean _stopped = new AtomicBoolean(true); - private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; - - private LogActor _logActor; - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private String _traceExclude; - private String _trace; - private final long _createTime = System.currentTimeMillis(); - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private final Map<String, Object> _arguments; - private int _deferredMessageCredit; - private long _deferredSizeCredit; - - - public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, - MessageFlowMode flowMode, - FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments) - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _session = session; - _postIdSettingAction = new AddMessageDispositionListenerAction(session); - _destination = destination; - _acceptMode = acceptMode; - _acquireMode = acquireMode; - _creditManager = creditManager; - _flowMode = flowMode; - _filters = filters; - _creditManager.addStateListener(this); - _arguments = arguments == null ? Collections.<String, Object> emptyMap() : - Collections.<String, Object> unmodifiableMap(arguments); - _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); - - } - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - Map<String, Object> arguments = queue.getArguments(); - _traceExclude = (String) arguments.get("qpid.trace.exclude"); - _trace = (String) arguments.get("qpid.trace.id"); - String filterLogString = null; - - _logActor = GenericActor.getInstance(this); - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - filterLogString = getFilterLogString(); - CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - public String getConsumerName() - { - return _destination; - } - - public boolean isSuspended() - { - return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension - } - - public boolean hasInterest(QueueEntry entry) - { - - - - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - - return false; - } - - if (entry.getMessage() instanceof MessageTransferMessage) - { - if(_noLocal) - { - Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); - if (connectionRef != null && connectionRef == _session.getReference()) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null) - { - return false; - } - } - - - return checkFilters(entry); - - - } - - private boolean checkFilters(QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - public boolean isBrowser() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean seesRequeues() - { - return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - finally - { - _stateChangeLock.unlock(); - } - - - - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - - public static class AddMessageDispositionListenerAction implements Runnable - { - private MessageTransfer _xfr; - private ServerSession.MessageDispositionChangeListener _action; - private ServerSession _session; - - public AddMessageDispositionListenerAction(ServerSession session) - { - _session = session; - } - - public void setXfr(MessageTransfer xfr) - { - _xfr = xfr; - } - - public void setAction(ServerSession.MessageDispositionChangeListener action) - { - _action = action; - } - - public void run() - { - if(_action != null) - { - _session.onMessageDispositionChange(_xfr, _action); - } - } - } - - private final AddMessageDispositionListenerAction _postIdSettingAction; - - public void send(final QueueEntry entry, boolean batch) throws AMQException - { - ServerMessage serverMsg = entry.getMessage(); - - - MessageTransfer xfr; - - DeliveryProperties deliveryProps; - MessageProperties messageProps = null; - - MessageTransferMessage msg; - - if(serverMsg instanceof MessageTransferMessage) - { - - msg = (MessageTransferMessage) serverMsg; - - } - else - { - MessageConverter converter = - MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); - - - msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost()); - } - DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); - messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - - deliveryProps = new DeliveryProperties(); - if(origDeliveryProps != null) - { - if(origDeliveryProps.hasDeliveryMode()) - { - deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); - } - if(origDeliveryProps.hasExchange()) - { - deliveryProps.setExchange(origDeliveryProps.getExchange()); - } - if(origDeliveryProps.hasExpiration()) - { - deliveryProps.setExpiration(origDeliveryProps.getExpiration()); - } - if(origDeliveryProps.hasPriority()) - { - deliveryProps.setPriority(origDeliveryProps.getPriority()); - } - if(origDeliveryProps.hasRoutingKey()) - { - deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); - } - if(origDeliveryProps.hasTimestamp()) - { - deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); - } - if(origDeliveryProps.hasTtl()) - { - deliveryProps.setTtl(origDeliveryProps.getTtl()); - } - - - } - - deliveryProps.setRedelivered(entry.isRedelivered()); - - if(_trace != null && messageProps == null) - { - messageProps = new MessageProperties(); - } - - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - - - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); - - boolean excludeDueToFederation = false; - - if(_trace != null) - { - if(!messageProps.hasApplicationHeaders()) - { - messageProps.setApplicationHeaders(new HashMap<String,Object>()); - } - Map<String,Object> appHeaders = messageProps.getApplicationHeaders(); - String trace = (String) appHeaders.get("x-qpid.trace"); - if(trace == null) - { - trace = _trace; - } - else - { - if(_traceExclude != null) - { - excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude); - } - trace+=","+_trace; - } - appHeaders.put("x-qpid.trace",trace); - } - - if(!excludeDueToFederation) - { - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); - } - else if(_flowMode == MessageFlowMode.WINDOW) - { - xfr.setCompletionListener(new Method.CompletionListener() - { - public void onComplete(Method method) - { - deferredAddCredit(1, entry.getSize()); - } - }); - } - - - _postIdSettingAction.setXfr(xfr); - if(_acceptMode == MessageAcceptMode.EXPLICIT) - { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); - } - else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); - } - else - { - _postIdSettingAction.setAction(null); - } - - - _session.sendMessage(xfr, _postIdSettingAction); - entry.incrementDeliveryCount(); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - forceDequeue(entry, false); - } - else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - recordUnacknowledged(entry); - } - } - else - { - forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW); - - } - } - - void recordUnacknowledged(QueueEntry entry) - { - _unacknowledgedCount.incrementAndGet(); - _unacknowledgedBytes.addAndGet(entry.getSize()); - } - - private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) - { - _deferredMessageCredit += deferredMessageCredit; - _deferredSizeCredit += deferredSizeCredit; - - } - - public void flushCreditState(boolean strict) - { - if(strict || !isSuspended() || _deferredMessageCredit >= 200 - || !(_creditManager instanceof WindowCreditManager) - || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) - { - _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); - _deferredMessageCredit = 0; - _deferredSizeCredit = 0l; - } - } - - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) - { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - if (restoreCredit) - { - restoreCredit(entry); - } - entry.discard(); - } - - public void onRollback() - { - - } - }); - } - - void reject(final QueueEntry entry) - { - entry.setRedelivered(); - entry.routeToAlternate(); - if(entry.isAcquiredBy(this)) - { - entry.discard(); - } - } - - void release(final QueueEntry entry, final boolean setRedelivered) - { - if (setRedelivered) - { - entry.setRedelivered(); - } - - if (getSessionModel().isClosing() || !setRedelivered) - { - entry.decrementDeliveryCount(); - } - - if (isMaxDeliveryLimitReached(entry)) - { - sendToDLQOrDiscard(entry); - } - else - { - entry.release(); - } - } - - protected void sendToDLQOrDiscard(QueueEntry entry) - { - final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); - final LogActor logActor = CurrentActor.get(); - final ServerMessage msg = entry.getMessage(); - if (alternateExchange != null) - { - final InboundMessage m = new InboundMessageAdapter(entry); - - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m); - - if (destinationQueues == null || destinationQueues.isEmpty()) - { - entry.discard(); - - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); - } - else - { - entry.routeToAlternate(); - - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); - } - } - } - else - { - entry.discard(); - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); - } - } - - private boolean isMaxDeliveryLimitReached(QueueEntry entry) - { - final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); - return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); - } - - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean wouldSuspend(QueueEntry entry) - { - return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void restoreCredit(QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void onDequeue(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void releaseQueueEntry(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void setStateListener(StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public FlowCreditManager_0_10 getCreditManager() - { - return _creditManager; - } - - - public void stop() - { - try - { - getSendLock(); - - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); - } - finally - { - releaseSendLock(); - } - } - - public void addCredit(MessageCreditUnit unit, long value) - { - FlowCreditManager_0_10 creditManager = getCreditManager(); - - switch (unit) - { - case MESSAGE: - - creditManager.addCredit(value, 0L); - break; - case BYTE: - creditManager.addCredit(0l, value); - break; - } - - _stopped.set(false); - - if(creditManager.hasCredit()) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - } - - } - - public void setFlowMode(MessageFlowMode flowMode) - { - - - _creditManager.removeListener(this); - - switch(flowMode) - { - case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); - break; - case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); - break; - default: - throw new RuntimeException("Unknown message flow mode: " + flowMode); - } - _flowMode = flowMode; - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - - _creditManager.addStateListener(this); - - } - - public boolean isStopped() - { - return _stopped.get(); - } - - public boolean acquires() - { - return _acquireMode == MessageAcquireMode.PRE_ACQUIRED; - } - - public void acknowledge(QueueEntry entry) - { - // TODO Fix Store Context / cleanup - if(entry.isAcquiredBy(this)) - { - _unacknowledgedBytes.addAndGet(-entry.getSize()); - _unacknowledgedCount.decrementAndGet(); - entry.discard(); - } - } - - public void flush() throws AMQException - { - flushCreditState(true); - _queue.flushSubscription(this); - stop(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public LogActor getLogActor() - { - return _logActor; - } - - public boolean isTransient() - { - return false; - } - - public ServerSession getSessionModel() - { - return _session; - } - - public boolean isBrowsing() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public boolean isDurable() - { - return false; - } - - - public boolean isExplicitAcknowledge() - { - return _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public String getCreditMode() - { - return _flowMode.toString(); - } - - public String getName() - { - return _destination; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isSessionTransactional() - { - return _session.isTransactional(); - } - - public void queueEmpty() - { - } - - public long getCreateTime() - { - return _createTime; - } - - public String toLogString() - { - String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getNameShortString()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; - return result; - } - - private String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } - - if (isBrowser()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - hasEntries = true; - } - - if (isDurable()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Durable"); - hasEntries = true; - } - - return filterLogString.toString(); - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - - public void flushBatched() - { - _session.getConnection().flush(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java deleted file mode 100644 index 0c04f22232..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.server.message.MessageReference; - -public class TransferMessageReference extends MessageReference<MessageTransferMessage> -{ - public TransferMessageReference(MessageTransferMessage message) - { - super(message); - } - - protected void onReference(MessageTransferMessage message) - { - message.incrementReference(); - } - - protected void onRelease(MessageTransferMessage message) - { - message.decrementReference(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java deleted file mode 100644 index 8e48741b91..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.flow.AbstractFlowCreditManager; - -public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 -{ - private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); - - private volatile long _bytesCreditLimit; - private volatile long _messageCreditLimit; - - private volatile long _bytesUsed; - private volatile long _messageUsed; - - public WindowCreditManager() - { - this(0L, 0L); - } - - public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - setSuspended(!hasCredit()); - - } - - public long getBytesCreditLimit() - { - return _bytesCreditLimit; - } - - public long getMessageCreditLimit() - { - return _messageCreditLimit; - } - - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCreditLimit == -1L - ? Long.MAX_VALUE - : _messageUsed < _messageCreditLimit ? _messageCreditLimit - _messageUsed : 0L; - } - - public long getBytesCredit() - { - return _bytesCreditLimit == -1L - ? Long.MAX_VALUE - : _bytesUsed < _bytesCreditLimit ? _bytesCreditLimit - _bytesUsed : 0L; - } - - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) - { - _messageUsed -= messageCredit; - if(_messageUsed < 0L) - { - LOGGER.error("Message credit used value was negative: "+ _messageUsed); - _messageUsed = 0; - } - - boolean notifyIncrease = true; - - if(_messageCreditLimit > 0L) - { - notifyIncrease = (_messageUsed != _messageCreditLimit); - } - - _bytesUsed -= bytesCredit; - if(_bytesUsed < 0L) - { - LOGGER.error("Bytes credit used value was negative: "+ _messageUsed); - _bytesUsed = 0; - } - - if(_bytesCreditLimit > 0L) - { - notifyIncrease = notifyIncrease && bytesCredit>0; - - if(notifyIncrease) - { - notifyIncreaseBytesCredit(); - } - } - - setSuspended(!hasCredit()); - } - - - - public synchronized boolean hasCredit() - { - return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed) - && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCreditLimit >= 0L) - { - if(_messageUsed < _messageCreditLimit) - { - if(_bytesCreditLimit < 0L) - { - _messageUsed++; - - return true; - } - else if(_bytesUsed + msgSize <= _bytesCreditLimit) - { - _messageUsed++; - _bytesUsed += msgSize; - - return true; - } - else - { - return false; - } - } - else - { - setSuspended(true); - return false; - } - } - else if(_bytesCreditLimit >= 0L) - { - if(_bytesUsed + msgSize <= _bytesCreditLimit) - { - _bytesUsed += msgSize; - - return true; - } - else - { - return false; - } - - } - else - { - return true; - } - - } - - - public synchronized void addCredit(long count, long bytes) - { - if(bytes > 0) - { - _bytesCreditLimit += bytes; - } - else if(bytes == -1) - { - _bytesCreditLimit = -1; - } - - - if(count > 0) - { - _messageCreditLimit += count; - } - else if(count == -1) - { - _messageCreditLimit = -1; - } - } - - public void clearCredit() - { - _bytesCreditLimit = 0l; - _messageCreditLimit = 0l; - setSuspended(true); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 3e1e8baddc..82b1f6a193 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -184,7 +184,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastReceivedTime; private boolean _blocking; - private final Lock _receivedLock; + private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker _broker; private final Transport _transport; @@ -973,16 +973,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { synchronized(this) { + + boolean lockHeld = _receivedLock.isHeldByCurrentThread(); + while(!_closed) { try { + if(lockHeld) + { + _receivedLock.unlock(); + } wait(1000); } catch (InterruptedException e) { } + finally + { + if(lockHeld) + { + _receivedLock.lock(); + } + } } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java index f17b79d896..e66f0889dd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java @@ -23,20 +23,11 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_10.Subscription_0_10; -import org.apache.qpid.server.protocol.v0_10.ServerSession; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageFlowMode; - -import java.util.Map; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter deleted file mode 100644 index ab1546ed0a..0000000000 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter +++ /dev/null @@ -1,21 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10 -org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8 -org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10 diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType index e0c0aa5873..43ad3adf13 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType @@ -17,4 +17,3 @@ # under the License. # org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8 -org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10 diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator index b771e25328..57ca615a04 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator @@ -19,4 +19,3 @@ org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8 org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9 org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1 -org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10 |
