From e32debe1df7d0a837e30cd937fb7a18fc5cfa203 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 24 Apr 2008 17:49:03 +0000 Subject: QPID-832 : Fix eol-style git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651325 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/FanoutExchange.java | 480 ++++----- .../qpid/server/handler/AccessRequestHandler.java | 130 +-- .../qpid/server/handler/BasicGetMethodHandler.java | 202 ++-- .../handler/BasicRecoverSyncMethodHandler.java | 150 +-- .../qpid/server/handler/QueuePurgeHandler.java | 242 ++--- .../server/handler/ServerMethodDispatcherImpl.java | 1132 ++++++++++---------- .../handler/ServerMethodDispatcherImpl_0_9.java | 328 +++--- .../handler/ServerMethodDispatcherImpl_8_0.java | 172 +-- .../server/handler/UnexpectedMethodException.java | 66 +- .../server/output/ProtocolOutputConverter.java | 114 +- .../output/ProtocolOutputConverterRegistry.java | 122 +-- .../amqp0_8/ProtocolOutputConverterImpl.java | 570 +++++----- .../amqp0_9/ProtocolOutputConverterImpl.java | 794 +++++++------- .../protocol/AMQNoMethodHandlerException.java | 92 +- .../protocol/UnknnownMessageTypeException.java | 92 +- .../qpid/server/queue/NotificationCheck.java | 276 ++--- .../server/queue/QueueNotificationListener.java | 54 +- .../server/virtualhost/ManagedVirtualHost.java | 88 +- .../server/virtualhost/VirtualHostRegistry.java | 140 +-- 19 files changed, 2622 insertions(+), 2622 deletions(-) (limited to 'qpid/java/broker/src') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index e7c887f306..f1b383eac9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,240 +1,240 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import java.util.List; -import java.util.Map; -import java.util.ArrayList; -import java.util.concurrent.CopyOnWriteArraySet; - -public class FanoutExchange extends AbstractExchange -{ - private static final Logger _logger = Logger.getLogger(FanoutExchange.class); - - /** - * Maps from queue name to queue instances - */ - private final CopyOnWriteArraySet _queues = new CopyOnWriteArraySet(); - - /** - * MBean class implementing the management interfaces. - */ - @MBeanDescription("Management Bean for Fanout Exchange") - private final class FanoutExchangeMBean extends ExchangeMBean - { - @MBeanConstructor("Creates an MBean for AMQ fanout exchange") - public FanoutExchangeMBean() throws JMException - { - super(); - _exchangeType = "fanout"; - init(); - } - - public TabularData bindings() throws OpenDataException - { - - _bindingList = new TabularDataSupport(_bindinglistDataType); - - for (AMQQueue queue : _queues) - { - String queueName = queue.getName().toString(); - - Object[] bindingItemValues = {queueName, new String[]{queueName}}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); - _bindingList.put(bindingData); - } - - return _bindingList; - } - - public void createNewBinding(String queueName, String binding) throws JMException - { - AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (queue == null) - { - throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); - } - - try - { - queue.bind(new AMQShortString(binding), null, FanoutExchange.this); - } - catch (AMQException ex) - { - throw new MBeanException(ex); - } - } - - } // End of MBean class - - protected ExchangeMBean createMBean() throws AMQException - { - try - { - return new FanoutExchange.FanoutExchangeMBean(); - } - catch (JMException ex) - { - _logger.error("Exception occured in creating the direct exchange mbean", ex); - throw new AMQException("Exception occured in creating the direct exchange mbean", ex); - } - } - - public static final ExchangeType TYPE = new ExchangeType() - { - - public AMQShortString getName() - { - return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; - } - - public Class getExchangeClass() - { - return FanoutExchange.class; - } - - public FanoutExchange newInstance(VirtualHost host, - AMQShortString name, - boolean durable, - int ticket, - boolean autoDelete) throws AMQException - { - FanoutExchange exch = new FanoutExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); - return exch; - } - - public AMQShortString getDefaultExchangeName() - { - return ExchangeDefaults.FANOUT_EXCHANGE_NAME; - } - }; - - public Map> getBindings() - { - return null; - } - - public AMQShortString getType() - { - return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; - } - - public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - assert queue != null; - - if (_queues.contains(queue)) - { - _logger.debug("Queue " + queue + " is already registered"); - } - else - { - _queues.add(queue); - _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); - } - } - - public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - assert queue != null; - - if (!_queues.remove(queue)) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". "); - } - } - - public void route(AMQMessage payload) throws AMQException - { - final MessagePublishInfo publishInfo = payload.getMessagePublishInfo(); - final AMQShortString routingKey = publishInfo.getRoutingKey(); - if ((_queues == null) || _queues.isEmpty()) - { - String msg = "No queues bound to " + this; - if (publishInfo.isMandatory() || publishInfo.isImmediate()) - { - throw new NoRouteException(msg, payload); - } - else - { - _logger.warn(msg); - } - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + _queues); - } - - payload.enqueue(new ArrayList(_queues)); - - } - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey, queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return _queues.contains(queue); - } - - public boolean isBound(AMQShortString routingKey) - { - - return (_queues != null) && !_queues.isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - - return _queues.contains(queue); - } - - public boolean hasBindings() - { - return !_queues.isEmpty(); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class FanoutExchange extends AbstractExchange +{ + private static final Logger _logger = Logger.getLogger(FanoutExchange.class); + + /** + * Maps from queue name to queue instances + */ + private final CopyOnWriteArraySet _queues = new CopyOnWriteArraySet(); + + /** + * MBean class implementing the management interfaces. + */ + @MBeanDescription("Management Bean for Fanout Exchange") + private final class FanoutExchangeMBean extends ExchangeMBean + { + @MBeanConstructor("Creates an MBean for AMQ fanout exchange") + public FanoutExchangeMBean() throws JMException + { + super(); + _exchangeType = "fanout"; + init(); + } + + public TabularData bindings() throws OpenDataException + { + + _bindingList = new TabularDataSupport(_bindinglistDataType); + + for (AMQQueue queue : _queues) + { + String queueName = queue.getName().toString(); + + Object[] bindingItemValues = {queueName, new String[]{queueName}}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + queue.bind(new AMQShortString(binding), null, FanoutExchange.this); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + } // End of MBean class + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new FanoutExchange.FanoutExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + } + } + + public static final ExchangeType TYPE = new ExchangeType() + { + + public AMQShortString getName() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public Class getExchangeClass() + { + return FanoutExchange.class; + } + + public FanoutExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + int ticket, + boolean autoDelete) throws AMQException + { + FanoutExchange exch = new FanoutExchange(); + exch.initialise(host, name, durable, ticket, autoDelete); + return exch; + } + + public AMQShortString getDefaultExchangeName() + { + return ExchangeDefaults.FANOUT_EXCHANGE_NAME; + } + }; + + public Map> getBindings() + { + return null; + } + + public AMQShortString getType() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (_queues.contains(queue)) + { + _logger.debug("Queue " + queue + " is already registered"); + } + else + { + _queues.add(queue); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + } + } + + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (!_queues.remove(queue)) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". "); + } + } + + public void route(AMQMessage payload) throws AMQException + { + final MessagePublishInfo publishInfo = payload.getMessagePublishInfo(); + final AMQShortString routingKey = publishInfo.getRoutingKey(); + if ((_queues == null) || _queues.isEmpty()) + { + String msg = "No queues bound to " + this; + if (publishInfo.isMandatory() || publishInfo.isImmediate()) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Publishing message to queue " + _queues); + } + + payload.enqueue(new ArrayList(_queues)); + + } + } + + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return isBound(routingKey, queue); + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return _queues.contains(queue); + } + + public boolean isBound(AMQShortString routingKey) + { + + return (_queues != null) && !_queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) + { + + return _queues.contains(queue); + } + + public boolean hasBindings() + { + return !_queues.isEmpty(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index dd712a404c..133c97a146 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -1,65 +1,65 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.handler; - -import org.apache.qpid.framing.*; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.AMQException; - -/** - * @author Apache Software Foundation - * - * - */ -public class AccessRequestHandler implements StateAwareMethodListener -{ - private static final AccessRequestHandler _instance = new AccessRequestHandler(); - - - public static AccessRequestHandler getInstance() - { - return _instance; - } - - private AccessRequestHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - - MethodRegistry methodRegistry = session.getMethodRegistry(); - - // We don't implement access control class, but to keep clients happy that expect it - // always use the "0" ticket. - AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); - - session.writeFrame(response.generateFrame(channelId)); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.AMQException; + +/** + * @author Apache Software Foundation + * + * + */ +public class AccessRequestHandler implements StateAwareMethodListener +{ + private static final AccessRequestHandler _instance = new AccessRequestHandler(); + + + public static AccessRequestHandler getInstance() + { + return _instance; + } + + private AccessRequestHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + MethodRegistry methodRegistry = session.getMethodRegistry(); + + // We don't implement access control class, but to keep clients happy that expect it + // always use the "0" ticket. + AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); + + session.writeFrame(response.generateFrame(channelId)); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index f8f9127809..3731116cba 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -1,101 +1,101 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.server.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicGetBody; -import org.apache.qpid.framing.BasicGetEmptyBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.security.access.Permission; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class BasicGetMethodHandler implements StateAwareMethodListener -{ - private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); - - private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); - - public static BasicGetMethodHandler getInstance() - { - return _instance; - } - - private BasicGetMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - - - VirtualHost vHost = session.getVirtualHost(); - - AMQChannel channel = session.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } - else - { - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); - - if (queue == null) - { - _log.info("No queue for '" + body.getQueue() + "'"); - if(body.getQueue()!=null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue()+ "'"); - } - else - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined."); - } - } - else - { - - //Perform ACLs - vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); - - if (!queue.performGet(session, channel, !body.getNoAck())) - { - MethodRegistry methodRegistry = session.getMethodRegistry(); - // TODO - set clusterId - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - - - session.writeFrame(responseBody.generateFrame(channelId)); - } - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicGetBody; +import org.apache.qpid.framing.BasicGetEmptyBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class BasicGetMethodHandler implements StateAwareMethodListener +{ + private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); + + private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); + + public static BasicGetMethodHandler getInstance() + { + return _instance; + } + + private BasicGetMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + + VirtualHost vHost = session.getVirtualHost(); + + AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + else + { + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); + + if (queue == null) + { + _log.info("No queue for '" + body.getQueue() + "'"); + if(body.getQueue()!=null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "No such queue, '" + body.getQueue()+ "'"); + } + else + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "No queue name provided, no default queue defined."); + } + } + else + { + + //Perform ACLs + vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); + + if (!queue.performGet(session, channel, !body.getNoAck())) + { + MethodRegistry methodRegistry = session.getMethodRegistry(); + // TODO - set clusterId + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + + + session.writeFrame(responseBody.generateFrame(channelId)); + } + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java index 3e2cab2e53..bca35be535 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java @@ -1,75 +1,75 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicRecoverSyncBody; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.AMQException; - -public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class); - - private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler(); - - public static BasicRecoverSyncMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - - _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); - AMQChannel channel = session.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } - - channel.resend(body.getRequeue()); - - // Qpid 0-8 hacks a synchronous -ok onto recover. - // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(session.getProtocolVersion().equals(ProtocolVersion.v0_9)) - { - MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - session.writeFrame(recoverOk.generateFrame(channelId)); - - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; + +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.AMQException; + +public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class); + + private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler(); + + public static BasicRecoverSyncMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); + AMQChannel channel = session.getChannel(channelId); + + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + channel.resend(body.getRequeue()); + + // Qpid 0-8 hacks a synchronous -ok onto recover. + // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant + if(session.getProtocolVersion().equals(ProtocolVersion.v0_9)) + { + MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); + session.writeFrame(recoverOk.generateFrame(channelId)); + + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index cce49f13c7..a854c97f60 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -1,121 +1,121 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.server.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.security.access.Permission; - -public class QueuePurgeHandler implements StateAwareMethodListener -{ - private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); - - public static QueuePurgeHandler getInstance() - { - return _instance; - } - - private final boolean _failIfNotFound; - - public QueuePurgeHandler() - { - this(true); - } - - public QueuePurgeHandler(boolean failIfNotFound) - { - _failIfNotFound = failIfNotFound; - } - - public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - AMQChannel channel = session.getChannel(channelId); - - - AMQQueue queue; - if(body.getQueue() == null) - { - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - - if(queue == null) - { - if(_failIfNotFound) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified."); - } - } - } - else - { - queue = queueRegistry.getQueue(body.getQueue()); - } - - if(queue == null) - { - if(_failIfNotFound) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); - } - } - else - { - - //Perform ACLs - virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue); - - long purged = queue.clearQueue(channel.getStoreContext()); - - - if(!body.getNowait()) - { - - MethodRegistry methodRegistry = session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - session.writeFrame(responseBody.generateFrame(channelId)); - - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.security.access.Permission; + +public class QueuePurgeHandler implements StateAwareMethodListener +{ + private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); + + public static QueuePurgeHandler getInstance() + { + return _instance; + } + + private final boolean _failIfNotFound; + + public QueuePurgeHandler() + { + this(true); + } + + public QueuePurgeHandler(boolean failIfNotFound) + { + _failIfNotFound = failIfNotFound; + } + + public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + AMQChannel channel = session.getChannel(channelId); + + + AMQQueue queue; + if(body.getQueue() == null) + { + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified."); + } + } + } + else + { + queue = queueRegistry.getQueue(body.getQueue()); + } + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + } + } + else + { + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue); + + long purged = queue.clearQueue(channel.getStoreContext()); + + + if(!body.getNowait()) + { + + MethodRegistry methodRegistry = session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); + session.writeFrame(responseBody.generateFrame(channelId)); + + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java index 9475b83c8f..d24e4f6ffa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java @@ -1,566 +1,566 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import java.util.Map; -import java.util.HashMap; - -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.framing.*; -import org.apache.qpid.AMQException; - -public class ServerMethodDispatcherImpl implements MethodDispatcher -{ - private final AMQStateManager _stateManager; - - private static interface DispatcherFactory - { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager); - } - - private static final Map _dispatcherFactories = - new HashMap(); - - - static - { - _dispatcherFactories.put(ProtocolVersion.v8_0, - new DispatcherFactory() - { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) - { - return new ServerMethodDispatcherImpl_8_0(stateManager); - } - }); - - _dispatcherFactories.put(ProtocolVersion.v0_9, - new DispatcherFactory() - { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) - { - return new ServerMethodDispatcherImpl_0_9(stateManager); - } - }); - - } - - - private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance(); - private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance(); - private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance(); - private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance(); - private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); - private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance(); - private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance(); - private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance(); - private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance(); - private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance(); - private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance(); - private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance(); - private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance(); - private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance(); - private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance(); - private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance(); - private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance(); - private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance(); - private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance(); - private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance(); - private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance(); - private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance(); - private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance(); - private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance(); - private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance(); - private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance(); - private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance(); - private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance(); - private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance(); - - - - public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion) - { - return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager); - } - - - public ServerMethodDispatcherImpl(AMQStateManager stateManager) - { - _stateManager = stateManager; - } - - - protected AMQStateManager getStateManager() - { - return _stateManager; - } - - - - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException - { - _accessRequestHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException - { - _basicAckMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException - { - _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException - { - _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException - { - _basicGetMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException - { - _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException - { - _basicQosHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException - { - _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException - { - _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException - { - _channelOpenHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException - { - _channelCloseHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException - { - _channelCloseOkHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException - { - _channelFlowHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException - { - _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException - { - _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException - { - _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException - { - _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException - { - _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException - { - _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException - { - _exchangeBoundHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException - { - _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException - { - _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException - { - _queueBindHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException - { - _queueDeclareHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException - { - _queueDeleteHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException - { - _queuePurgeHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException - { - _txCommitHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException - { - _txRollbackHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException - { - _txSelectHandler.methodReceived(_stateManager, body, channelId); - return true; - } - - - - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.framing.*; +import org.apache.qpid.AMQException; + +public class ServerMethodDispatcherImpl implements MethodDispatcher +{ + private final AMQStateManager _stateManager; + + private static interface DispatcherFactory + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager); + } + + private static final Map _dispatcherFactories = + new HashMap(); + + + static + { + _dispatcherFactories.put(ProtocolVersion.v8_0, + new DispatcherFactory() + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + { + return new ServerMethodDispatcherImpl_8_0(stateManager); + } + }); + + _dispatcherFactories.put(ProtocolVersion.v0_9, + new DispatcherFactory() + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + { + return new ServerMethodDispatcherImpl_0_9(stateManager); + } + }); + + } + + + private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance(); + private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance(); + private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance(); + private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance(); + private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance(); + private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance(); + private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance(); + private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance(); + private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance(); + private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance(); + private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance(); + private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance(); + private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance(); + private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance(); + private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance(); + private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance(); + private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance(); + private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance(); + private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance(); + private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance(); + private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance(); + private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance(); + private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance(); + private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance(); + private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance(); + private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance(); + private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance(); + + + + public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion) + { + return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager); + } + + + public ServerMethodDispatcherImpl(AMQStateManager stateManager) + { + _stateManager = stateManager; + } + + + protected AMQStateManager getStateManager() + { + return _stateManager; + } + + + + public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + { + _accessRequestHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException + { + _basicAckMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException + { + _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + { + _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + { + _basicGetMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException + { + _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + { + _basicQosHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + { + _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + { + _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + { + _channelOpenHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + { + _channelCloseHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + { + _channelCloseOkHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException + { + _channelFlowHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + + public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + { + _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + { + _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + { + _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + + public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + { + _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + { + _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + { + _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + { + _exchangeBoundHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException + { + _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + { + _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + { + _queueBindHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException + { + _queueDeclareHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException + { + _queueDeleteHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException + { + _queuePurgeHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException + { + _txCommitHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException + { + _txRollbackHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + { + _txSelectHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java index 8b1dca77ba..382a85347b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java @@ -1,164 +1,164 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - - -import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.AMQException; - - - -public class ServerMethodDispatcherImpl_0_9 - extends ServerMethodDispatcherImpl - implements MethodDispatcher_0_9 - -{ - - private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler = - BasicRecoverSyncMethodHandler.getInstance(); - private static final QueueUnbindHandler _queueUnbindHandler = - QueueUnbindHandler.getInstance(); - - - public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager) - { - super(stateManager); - } - - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId); - return true; - } - - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException - { - _queueUnbindHandler.methodReceived(getStateManager(),body,channelId); - return true; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + + +import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.AMQException; + + + +public class ServerMethodDispatcherImpl_0_9 + extends ServerMethodDispatcherImpl + implements MethodDispatcher_0_9 + +{ + + private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler = + BasicRecoverSyncMethodHandler.getInstance(); + private static final QueueUnbindHandler _queueUnbindHandler = + QueueUnbindHandler.getInstance(); + + + public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + { + _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId); + return true; + } + + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + { + _queueUnbindHandler.methodReceived(getStateManager(),body,channelId); + return true; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java index d599ca3d4e..22f64cf7d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java @@ -1,86 +1,86 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.AMQException; - -public class ServerMethodDispatcherImpl_8_0 - extends ServerMethodDispatcherImpl - implements MethodDispatcher_8_0 -{ - public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager) - { - super(stateManager); - } - - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException - { - return false; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.AMQException; + +public class ServerMethodDispatcherImpl_8_0 + extends ServerMethodDispatcherImpl + implements MethodDispatcher_8_0 +{ + public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + { + return false; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java index fb18519fe1..0abb3cdd7d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java @@ -1,33 +1,33 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.AMQException; - -public class UnexpectedMethodException extends AMQException -{ - public UnexpectedMethodException(AMQMethodBody body) - { - super("Unexpected method recevied: " + body.getClass().getName()); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; + +public class UnexpectedMethodException extends AMQException +{ + public UnexpectedMethodException(AMQMethodBody body) + { + super("Unexpected method recevied: " + body.getClass().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java index e01c5aabbf..576d577b40 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java @@ -1,57 +1,57 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.AMQException; - -public interface ProtocolOutputConverter -{ - void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag); - - interface Factory - { - ProtocolOutputConverter newInstance(AMQProtocolSession session); - } - - void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException; - - void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException; - - byte getProtocolMinorVersion(); - - byte getProtocolMajorVersion(); - - void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException; - - void writeFrame(AMQDataBlock block); -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.AMQException; + +public interface ProtocolOutputConverter +{ + void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag); + + interface Factory + { + ProtocolOutputConverter newInstance(AMQProtocolSession session); + } + + void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException; + + void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException; + + byte getProtocolMinorVersion(); + + byte getProtocolMajorVersion(); + + void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException; + + void writeFrame(AMQDataBlock block); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 36e7e88fd6..02fb1429c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -1,61 +1,61 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output; - -import org.apache.qpid.server.output.ProtocolOutputConverter.Factory; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.framing.ProtocolVersion; - -import java.util.Map; -import java.util.HashMap; - -public class ProtocolOutputConverterRegistry -{ - - private static final Map _registry = - new HashMap(); - - - static - { - register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory()); - register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory()); - - } - - private static void register(ProtocolVersion version, Factory converter) - { - - _registry.put(version,converter); - } - - - public static ProtocolOutputConverter getConverter(AMQProtocolSession session) - { - return _registry.get(session.getProtocolVersion()).newInstance(session); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output; + +import org.apache.qpid.server.output.ProtocolOutputConverter.Factory; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.framing.ProtocolVersion; + +import java.util.Map; +import java.util.HashMap; + +public class ProtocolOutputConverterRegistry +{ + + private static final Map _registry = + new HashMap(); + + + static + { + register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory()); + register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory()); + + } + + private static void register(ProtocolVersion version, Factory converter) + { + + _registry.put(version,converter); + } + + + public static ProtocolOutputConverter getConverter(AMQProtocolSession session) + { + return _registry.get(session.getProtocolVersion()).newInstance(session); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index d7a879180a..d4cb7c878f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -1,285 +1,285 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output.amqp0_8; - -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.AMQException; - -import org.apache.mina.common.ByteBuffer; - -import java.util.Iterator; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final Long messageId = message.getMessageId(); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - - if(bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext,messageId, i); - writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException - { - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final long messageId = message.getMessageId(); - - AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); - - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - if(bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext, messageId, i); - writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - - private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicDeliverBody deliverBody = - methodRegistry.createBasicDeliverBody(consumerTag, - deliveryTag, - messageHandle.isRedelivered(), - pb.getExchange(), - pb.getRoutingKey()); - AMQFrame deliverFrame = deliverBody.generateFrame(channelId); - - - return deliverFrame; - } - - private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicGetOkBody getOkBody = - methodRegistry.createBasicGetOkBody(deliveryTag, - messageHandle.isRedelivered(), - pb.getExchange(), - pb.getRoutingKey(), - queueSize); - AMQFrame getOkFrame = getOkBody.generateFrame(channelId); - - return getOkFrame; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicReturnBody basicReturnBody = - methodRegistry.createBasicReturnBody(replyCode, - replyText, - message.getMessagePublishInfo().getExchange(), - message.getMessagePublishInfo().getRoutingKey()); - AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); - - return returnFrame; - } - - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - Iterator bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); - - writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - writeFrame(bodyFrameIterator.next()); - } - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output.amqp0_8; + +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Iterator; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final Long messageId = message.getMessageId(); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext,messageId, i); + writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + + + } + + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final long messageId = message.getMessageId(); + + AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); + + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext, messageId, i); + writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + + + } + + + private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicDeliverBody deliverBody = + methodRegistry.createBasicDeliverBody(consumerTag, + deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey()); + AMQFrame deliverFrame = deliverBody.generateFrame(channelId); + + + return deliverFrame; + } + + private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicGetOkBody getOkBody = + methodRegistry.createBasicGetOkBody(deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey(), + queueSize); + AMQFrame getOkFrame = getOkBody.generateFrame(channelId); + + return getOkFrame; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicReturnBody basicReturnBody = + methodRegistry.createBasicReturnBody(replyCode, + replyText, + message.getMessagePublishInfo().getExchange(), + message.getMessagePublishInfo().getRoutingKey()); + AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); + + return returnFrame; + } + + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + Iterator bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); + + writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + writeFrame(bodyFrameIterator.next()); + } + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 48d2ca9bc9..f87d3bcae1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -1,397 +1,397 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.output.amqp0_9; - -import org.apache.mina.common.ByteBuffer; - -import java.util.Iterator; - -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); - - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); - final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody(); - - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final Long messageId = message.getMessageId(); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - - if(bodyCount == 0) - { - SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); - - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb); - - CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext,messageId, i); - writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); - } - - - } - - - } - - private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) - { - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - contentHeaderBody); - return contentHeader; - } - - - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException - { - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final long messageId = message.getMessageId(); - - AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); - - - AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - if(bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext, messageId, i); - writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); - } - - - } - - - } - - - private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag) - throws AMQException - { - - - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - - final AMQBody returnBlock = new AMQBody() - { - - - - private final boolean _isRedelivered = messageHandle.isRedelivered(); - private final AMQShortString _exchangeName = pb.getExchange(); - private final AMQShortString _routingKey = pb.getRoutingKey(); - - - public AMQBody _underlyingBody; - - public AMQBody createAMQBody() - { - return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, - deliveryTag, - _isRedelivered, - _exchangeName, - _routingKey); - - - - - - } - - public byte getFrameType() - { - return AMQMethodBody.TYPE; - } - - public int getSize() - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - return _underlyingBody.getSize(); - } - - public void writePayload(ByteBuffer buffer) - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } - - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - throw new AMQException("This block should never be dispatched!"); - } - }; - return returnBlock; - } - - private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - - BasicGetOkBody getOkBody = - METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, - messageHandle.isRedelivered(), - pb.getExchange(), - pb.getRoutingKey(), - queueSize); - AMQFrame getOkFrame = getOkBody.generateFrame(channelId); - - return getOkFrame; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - - BasicReturnBody basicReturnBody = - METHOD_REGISTRY.createBasicReturnBody(replyCode, - replyText, - message.getMessagePublishInfo().getExchange(), - message.getMessagePublishInfo().getRoutingKey()); - AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); - - return returnFrame; - } - - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); - - Iterator bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); - - writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - writeFrame(bodyFrameIterator.next()); - } - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - - BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } - - - public static final class CompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final AMQBody _contentBody; - private final int _channel; - - - public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - _contentBody = contentBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); - } - - public void writePayload(ByteBuffer buffer) - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); - } - } - - public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final int _channel; - - - public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; - } - - public void writePayload(ByteBuffer buffer) - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); - } - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.output.amqp0_9; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Iterator; + +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); + + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); + final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody(); + + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final Long messageId = message.getMessageId(); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + + if(bodyCount == 0) + { + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb); + + CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext,messageId, i); + writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); + } + + + } + + + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final long messageId = message.getMessageId(); + + AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); + + + AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext, messageId, i); + writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); + } + + + } + + + } + + + private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag) + throws AMQException + { + + + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + + final AMQBody returnBlock = new AMQBody() + { + + + + private final boolean _isRedelivered = messageHandle.isRedelivered(); + private final AMQShortString _exchangeName = pb.getExchange(); + private final AMQShortString _routingKey = pb.getRoutingKey(); + + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + _isRedelivered, + _exchangeName, + _routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey(), + queueSize); + AMQFrame getOkFrame = getOkBody.generateFrame(channelId); + + return getOkFrame; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + message.getMessagePublishInfo().getExchange(), + message.getMessagePublishInfo().getRoutingKey()); + AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); + + return returnFrame; + } + + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); + + Iterator bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); + + writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + writeFrame(bodyFrameIterator.next()); + } + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java index a7599a3e0d..92f951ce39 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -1,46 +1,46 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -/** - * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represents failure to handle an AMQP method. - *
- * - * @todo Not an AMQP exception as no status code. - * - * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a - * Runtime. - */ -public class AMQNoMethodHandlerException extends AMQException -{ - public AMQNoMethodHandlerException(AMQMethodEvent evt) - { - super("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to handle an AMQP method. + *
+ * + * @todo Not an AMQP exception as no status code. + * + * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a + * Runtime. + */ +public class AMQNoMethodHandlerException extends AMQException +{ + public AMQNoMethodHandlerException(AMQMethodEvent evt) + { + super("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java index 6e72aa062f..bb2db8d506 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -1,46 +1,46 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; - -/** - * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represents failure to cast a frame to its expected type. - *
- * - * @todo Not an AMQP exception as no status code. - * - * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would - * be better just to leave that as a ClassCastException. However, check the framing layer catches this error - * first. - */ -public class UnknnownMessageTypeException extends AMQException -{ - public UnknnownMessageTypeException(AMQDataBlock message) - { - super("Unknown message type: " + message.getClass().getName() + ": " + message); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +/** + * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to cast a frame to its expected type. + *
+ * + * @todo Not an AMQP exception as no status code. + * + * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would + * be better just to leave that as a ClassCastException. However, check the framing layer catches this error + * first. + */ +public class UnknnownMessageTypeException extends AMQException +{ + public UnknnownMessageTypeException(AMQDataBlock message) + { + super("Unknown message type: " + message.getClass().getName() + ": " + message); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6f9efd3200..65115e4103 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -1,138 +1,138 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; - -public enum NotificationCheck -{ - - MESSAGE_COUNT_ALERT - { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) - { - int msgCount; - final long maximumMessageCount = queue.getMaximumMessageCount(); - if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount) - { - listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); - return true; - } - return false; - } - }, - MESSAGE_SIZE_ALERT(true) - { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) - { - final long maximumMessageSize = queue.getMaximumMessageSize(); - if(maximumMessageSize != 0) - { - // Check for threshold message size - long messageSize; - try - { - messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; - } - catch (AMQException e) - { - messageSize = 0; - } - - - if (messageSize >= maximumMessageSize) - { - listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); - return true; - } - } - return false; - } - - }, - QUEUE_DEPTH_ALERT - { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) - { - // Check for threshold queue depth in bytes - final long maximumQueueDepth = queue.getMaximumQueueDepth(); - - if(maximumQueueDepth != 0) - { - final long queueDepth = queue.getQueueDepth(); - - if (queueDepth >= maximumQueueDepth) - { - listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached."); - return true; - } - } - return false; - } - - }, - MESSAGE_AGE_ALERT - { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) - { - - final long maxMessageAge = queue.getMaximumMessageAge(); - if(maxMessageAge != 0) - { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - maxMessageAge; - final long firstArrivalTime = queue.getOldestMessageArrivalTime(); - - if(firstArrivalTime < thresholdTime) - { - long oldestAge = currentTime - firstArrivalTime; - listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."); - - return true; - } - } - return false; - - } - - } - ; - - private final boolean _messageSpecific; - - NotificationCheck() - { - this(false); - } - - NotificationCheck(boolean messageSpecific) - { - _messageSpecific = messageSpecific; - } - - public boolean isMessageSpecific() - { - return _messageSpecific; - } - - abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +public enum NotificationCheck +{ + + MESSAGE_COUNT_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + int msgCount; + final long maximumMessageCount = queue.getMaximumMessageCount(); + if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount) + { + listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); + return true; + } + return false; + } + }, + MESSAGE_SIZE_ALERT(true) + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + final long maximumMessageSize = queue.getMaximumMessageSize(); + if(maximumMessageSize != 0) + { + // Check for threshold message size + long messageSize; + try + { + messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; + } + catch (AMQException e) + { + messageSize = 0; + } + + + if (messageSize >= maximumMessageSize) + { + listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); + return true; + } + } + return false; + } + + }, + QUEUE_DEPTH_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + // Check for threshold queue depth in bytes + final long maximumQueueDepth = queue.getMaximumQueueDepth(); + + if(maximumQueueDepth != 0) + { + final long queueDepth = queue.getQueueDepth(); + + if (queueDepth >= maximumQueueDepth) + { + listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached."); + return true; + } + } + return false; + } + + }, + MESSAGE_AGE_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + + final long maxMessageAge = queue.getMaximumMessageAge(); + if(maxMessageAge != 0) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - maxMessageAge; + final long firstArrivalTime = queue.getOldestMessageArrivalTime(); + + if(firstArrivalTime < thresholdTime) + { + long oldestAge = currentTime - firstArrivalTime; + listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."); + + return true; + } + } + return false; + + } + + } + ; + + private final boolean _messageSpecific; + + NotificationCheck() + { + this(false); + } + + NotificationCheck(boolean messageSpecific) + { + _messageSpecific = messageSpecific; + } + + public boolean isMessageSpecific() + { + return _messageSpecific; + } + + abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java index 959ca03c80..f1e7c98387 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java @@ -1,27 +1,27 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - - -public interface QueueNotificationListener -{ - void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + + +public interface QueueNotificationListener +{ + void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java index 85d804457e..70a76dd8c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java @@ -1,44 +1,44 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.io.IOException; - -import org.apache.qpid.server.management.MBeanAttribute; - -/** - * The management interface exposed to allow management of an Exchange. - * @version 0.1 - */ -public interface ManagedVirtualHost -{ - static final String TYPE = "VirtualHost"; - - /** - * Returns the name of the managed virtualHost. - * @return the name of the exchange. - * @throws java.io.IOException - */ - @MBeanAttribute(name="Name", description= TYPE + " Name") - String getName() throws IOException; - - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.io.IOException; + +import org.apache.qpid.server.management.MBeanAttribute; + +/** + * The management interface exposed to allow management of an Exchange. + * @version 0.1 + */ +public interface ManagedVirtualHost +{ + static final String TYPE = "VirtualHost"; + + /** + * Returns the name of the managed virtualHost. + * @return the name of the exchange. + * @throws java.io.IOException + */ + @MBeanAttribute(name="Name", description= TYPE + " Name") + String getName() throws IOException; + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 27917fac8a..9b1619c609 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -1,70 +1,70 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -public class VirtualHostRegistry -{ - private final Map _registry = new ConcurrentHashMap(); - - - private String _defaultVirtualHostName; - - public synchronized void registerVirtualHost(VirtualHost host) throws Exception - { - if(_registry.containsKey(host.getName())) - { - throw new Exception("Virtual Host with name " + host.getName() + " already registered."); - } - _registry.put(host.getName(),host); - } - - public VirtualHost getVirtualHost(String name) - { - if(name == null || name.trim().length() == 0 ) - { - name = getDefaultVirtualHostName(); - } - - return _registry.get(name); - } - - private String getDefaultVirtualHostName() - { - return _defaultVirtualHostName; - } - - public void setDefaultVirtualHostName(String defaultVirtualHostName) - { - _defaultVirtualHostName = defaultVirtualHostName; - } - - - public Collection getVirtualHosts() - { - return new ArrayList(_registry.values()); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +public class VirtualHostRegistry +{ + private final Map _registry = new ConcurrentHashMap(); + + + private String _defaultVirtualHostName; + + public synchronized void registerVirtualHost(VirtualHost host) throws Exception + { + if(_registry.containsKey(host.getName())) + { + throw new Exception("Virtual Host with name " + host.getName() + " already registered."); + } + _registry.put(host.getName(),host); + } + + public VirtualHost getVirtualHost(String name) + { + if(name == null || name.trim().length() == 0 ) + { + name = getDefaultVirtualHostName(); + } + + return _registry.get(name); + } + + private String getDefaultVirtualHostName() + { + return _defaultVirtualHostName; + } + + public void setDefaultVirtualHostName(String defaultVirtualHostName) + { + _defaultVirtualHostName = defaultVirtualHostName; + } + + + public Collection getVirtualHosts() + { + return new ArrayList(_registry.values()); + } +} -- cgit v1.2.1