diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.h')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 276 |
1 files changed, 0 insertions, 276 deletions
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h deleted file mode 100644 index a0da9efbb8..0000000000 --- a/cpp/src/qpid/cluster/Connection.h +++ /dev/null @@ -1,276 +0,0 @@ -#ifndef QPID_CLUSTER_CONNECTION_H -#define QPID_CLUSTER_CONNECTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include "OutputInterceptor.h" -#include "McastFrameHandler.h" -#include "UpdateReceiver.h" - -#include "qpid/RefCounted.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/SecureConnection.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/amqp_0_10/Connection.h" -#include "qpid/sys/AtomicValue.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/SecuritySettings.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/FrameDecoder.h" - -#include <iosfwd> - -namespace qpid { - -namespace framing { class AMQFrame; } - -namespace broker { -class SemanticState; -struct QueuedMessage; -class TxBuffer; -class TxAccept; -} - -namespace cluster { -class Cluster; -class Event; -struct EventFrame; - -/** Intercept broker::Connection calls for shadow and local cluster connections. */ -class Connection : - public RefCounted, - public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler, - private broker::Connection::ErrorListener - -{ - public: - - /** Local connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, - const qpid::sys::SecuritySettings& external); - /** Shadow connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, - const qpid::sys::SecuritySettings& external); - ~Connection(); - - ConnectionId getId() const { return self; } - broker::Connection* getBrokerConnection() { return connection.get(); } - const broker::Connection* getBrokerConnection() const { return connection.get(); } - - /** Local connections may be clients or catch-up connections */ - bool isLocal() const; - - bool isLocalClient() const { return isLocal() && !isCatchUp(); } - - /** True for connections that are shadowing remote broker connections */ - bool isShadow() const; - - /** True if the connection is in "catch-up" mode: building initial broker state. */ - bool isCatchUp() const { return catchUp; } - - /** True if the connection is a completed shared update connection */ - bool isUpdated() const; - - Cluster& getCluster() { return cluster; } - - // ConnectionInputHandler methods - void received(framing::AMQFrame&); - void closed(); - bool doOutput(); - void idleOut() { if (connection.get()) connection->idleOut(); } - void idleIn() { if (connection.get()) connection->idleIn(); } - - // ConnectionCodec methods - called by IO layer with a read buffer. - size_t decode(const char* buffer, size_t size); - - // Called for data delivered from the cluster. - void deliveredFrame(const EventFrame&); - - void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - - // ==== Used in catch-up mode to build initial state. - // - // State update methods. - void shadowPrepare(const std::string&); - - void shadowSetUser(const std::string&); - - void sessionState(const framing::SequenceNumber& replayStart, - const framing::SequenceNumber& sendCommandPoint, - const framing::SequenceSet& sentIncomplete, - const framing::SequenceNumber& expected, - const framing::SequenceNumber& received, - const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); - - void outputTask(uint16_t channel, const std::string& name); - - void shadowReady(uint64_t memberId, - uint64_t connectionId, - const std::string& managementId, - const std::string& username, - const std::string& fragment, - uint32_t sendMax); - - void membership(const framing::FieldTable&, const framing::FieldTable&, - const framing::SequenceNumber& frameSeq); - - void retractOffer(); - - void deliveryRecord(const std::string& queue, - const framing::SequenceNumber& position, - const std::string& tag, - const framing::SequenceNumber& id, - bool acquired, - bool accepted, - bool cancelled, - bool completed, - bool ended, - bool windowing, - bool enqueued, - uint32_t credit); - - void queuePosition(const std::string&, const framing::SequenceNumber&); - void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); - void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); - void expiryId(uint64_t); - - void txStart(); - void txAccept(const framing::SequenceSet&); - void txDequeue(const std::string&); - void txEnqueue(const std::string&); - void txPublish(const framing::Array&, bool); - void txEnd(); - void accumulatedAck(const framing::SequenceSet&); - - // Encoded exchange replication. - void exchange(const std::string& encoded); - - void giveReadCredit(int credit); - void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, - bool nodict, const std::string& username, - const std::string& initFrames); - void close(); - void abort(); - void deliverClose(); - - OutputInterceptor& getOutput() { return output; } - - void addQueueListener(const std::string& queue, uint32_t listener); - void managementSetupState(uint64_t objectNum, - uint16_t bootSequence, - const framing::Uuid&, - const std::string& vendor, - const std::string& product, - const std::string& instance); - - void config(const std::string& encoded); - - void setSecureConnection ( broker::SecureConnection * sc ); - - void doCatchupIoCallbacks(); - - private: - struct NullFrameHandler : public framing::FrameHandler { - void handle(framing::AMQFrame&) {} - }; - - // Arguments to construct a broker::Connection - struct ConnectionCtor { - sys::ConnectionOutputHandler* out; - broker::Broker& broker; - std::string mgmtId; - qpid::sys::SecuritySettings external; - bool isLink; - uint64_t objectId; - bool shadow; - bool delayManagement; - - ConnectionCtor( - sys::ConnectionOutputHandler* out_, - broker::Broker& broker_, - const std::string& mgmtId_, - const qpid::sys::SecuritySettings& external_, - bool isLink_=false, - uint64_t objectId_=0, - bool shadow_=false, - bool delayManagement_=false - ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), - isLink(isLink_), objectId(objectId_), shadow(shadow_), - delayManagement(delayManagement_) - {} - - std::auto_ptr<broker::Connection> construct() { - return std::auto_ptr<broker::Connection>( - new broker::Connection( - out, broker, mgmtId, external, isLink, objectId, - shadow, delayManagement) - ); - } - }; - - static NullFrameHandler nullFrameHandler; - - // Error listener functions - void connectionError(const std::string&); - void sessionError(uint16_t channel, const std::string&); - - void init(); - bool checkUnsupported(const framing::AMQBody& body); - void deliverDoOutput(uint32_t limit); - - bool checkProtocolHeader(const char*& data, size_t size); - void processInitialFrames(const char*& data, size_t size); - boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); - broker::SessionState& sessionState(); - broker::SemanticState& semanticState(); - broker::QueuedMessage getUpdateMessage(); - void closeUpdated(); - - Cluster& cluster; - ConnectionId self; - bool catchUp; - bool announced; - OutputInterceptor output; - framing::FrameDecoder localDecoder; - ConnectionCtor connectionCtor; - std::auto_ptr<broker::Connection> connection; - framing::SequenceNumber deliverSeq; - framing::ChannelId currentChannel; - boost::shared_ptr<broker::TxBuffer> txBuffer; - bool expectProtocolHeader; - McastFrameHandler mcastFrameHandler; - UpdateReceiver& updateIn; - qpid::broker::SecureConnection* secureConnection; - std::string initialFrames; - - static qpid::sys::AtomicValue<uint64_t> catchUpId; - - friend std::ostream& operator<<(std::ostream&, const Connection&); -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNECTION_H*/ |