diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.h')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 7ee85bf1aa..fe66b77238 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,11 +24,12 @@ #include "types.h" #include "OutputInterceptor.h" -#include "EventFrame.h" #include "McastFrameHandler.h" #include "UpdateReceiver.h" +#include "qpid/RefCounted.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" @@ -47,7 +48,7 @@ namespace framing { class AMQFrame; } namespace broker { class SemanticState; -class QueuedMessage; +struct QueuedMessage; class TxBuffer; class TxAccept; } @@ -55,6 +56,7 @@ class TxAccept; namespace cluster { class Cluster; class Event; +struct EventFrame; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : @@ -62,7 +64,7 @@ class Connection : public sys::ConnectionInputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler, private broker::Connection::ErrorListener - + { public: @@ -73,7 +75,7 @@ class Connection : Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external); ~Connection(); - + ConnectionId getId() const { return self; } broker::Connection* getBrokerConnection() { return connection.get(); } const broker::Connection* getBrokerConnection() const { return connection.get(); } @@ -108,9 +110,9 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - + // ==== Used in catch-up mode to build initial state. - // + // // State update methods. void shadowPrepare(const std::string&); @@ -122,10 +124,11 @@ class Connection : const framing::SequenceNumber& expected, const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); - + const SequenceSet& receivedIncomplete, + bool dtxSelected); + void outputTask(uint16_t channel, const std::string& name); - + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& managementId, @@ -153,7 +156,7 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); - void expiryId(uint64_t); + void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void txStart(); void txAccept(const framing::SequenceSet&); @@ -163,8 +166,18 @@ class Connection : void txEnd(); void accumulatedAck(const framing::SequenceSet&); - // Encoded queue/exchange replication. - void queue(const std::string& encoded); + // Dtx state + void dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired); + void dtxEnd(); + void dtxAck(); + void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended); + void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); + + // Encoded exchange replication. void exchange(const std::string& encoded); void giveReadCredit(int credit); @@ -189,6 +202,12 @@ class Connection : void setSecureConnection ( broker::SecureConnection * sc ); + void doCatchupIoCallbacks(); + + void clock(uint64_t time); + + void queueDequeueSincePurgeState(const std::string&, uint32_t); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -233,7 +252,7 @@ class Connection : // Error listener functions void connectionError(const std::string&); void sessionError(uint16_t channel, const std::string&); - + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverDoOutput(uint32_t limit); @@ -245,10 +264,11 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); void closeUpdated(); - + void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &); Cluster& cluster; ConnectionId self; bool catchUp; + bool announced; OutputInterceptor output; framing::FrameDecoder localDecoder; ConnectionCtor connectionCtor; @@ -256,6 +276,9 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; + boost::shared_ptr<broker::DtxBuffer> dtxBuffer; + broker::DeliveryRecords dtxAckRecords; + broker::DtxWorkRecord* dtxCurrent; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; |