/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
*
CLUSTER IMPLEMENTATION OVERVIEW
*
* The cluster works on the principle that if all members of the
* cluster receive identical input, they will all produce identical
* results. cluster::Connections intercept data received from clients
* and multicast it via CPG. The data is processed (passed to the
* broker::Connection) only when it is received from CPG in cluster
* order. Each cluster member has Connection objects for directly
* connected clients and "shadow" Connection objects for connections
* to other members.
*
* This assumes that all broker actions occur deterministically in
* response to data arriving on client connections. There are two
* situations where this assumption fails:
* - sending data in response to polling local connections for writabiliy.
* - taking actions based on a timer or timestamp comparison.
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
*
*
* USE OF TIMESTAMPS IN THE BROKER
*
* The following are the current areas where broker uses timers or timestamps:
*
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
*
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
*
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
*
* - LinkRegistry: only cluster elder is ever active for links.
*
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
*
* cluster::ExpiryPolicy uses cluster time.
*
* ClusterTimer implements periodic timed events in the cluster context.
* Used for:
* - periodic management events.
* - DTX transaction timeouts.
*
* CLUSTER PROTOCOL OVERVIEW
*
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
*
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
*
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
*
* The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml
* which defines two classes:
* - cluster: cluster control information.
* - cluster.connection: control information for a specific connection.
*
* The following combinations are legal:
* - Data frames carrying connection data.
* - Cluster control events carrying cluster commands.
* - Connection control events carrying cluster.connection commands.
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
*
* CLUSTER INITIALIZATION OVERVIEW
*
* @see InitialStatusMap
*
* When a new member joins the CPG group, all members (including the
* new one) multicast their "initial status." The new member is in
* PRE_INIT mode until it gets a complete set of initial status
* messages from all cluster members. In a newly-forming cluster is
* then in INIT mode until the configured cluster-size members have
* joined.
*
* The newcomer uses initial status to determine
* - The cluster UUID
* - Am I speaking the correct version of the cluster protocol?
* - Do I need to get an update from an existing active member?
* - Can I recover from my own store?
*
* Pre-initialization happens in the Cluster constructor (plugin
* early-init phase) because it needs to set the recovery flag before
* the store initializes. This phase lasts until inital-status is
* received for all active members. The PollableQueues and Multicaster
* are in "bypass" mode during this phase since the poller has not
* started so there are no threads to serve pollable queues.
*
* The remaining initialization happens in Cluster::initialize() or,
* if cluster-size=N is specified, in the deliver thread when an
* initial-status control is delivered that brings the total to N.
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/cluster/ClusterSettings.h"
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/ClusterDeliverToQueueBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
#include
#include
#include
#include
#include
#include
#include