summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/FailoverSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/FailoverSession.cpp')
-rw-r--r--cpp/src/qpid/client/FailoverSession.cpp592
1 files changed, 592 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp
new file mode 100644
index 0000000000..1e20edde4a
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverSession.cpp
@@ -0,0 +1,592 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <fstream>
+
+
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/client/FailoverConnection.h"
+#include "qpid/client/FailoverSession.h"
+
+
+using namespace std;
+
+
+namespace qpid {
+namespace client {
+
+FailoverSession::FailoverSession ( ) :
+ name("no_name")
+{
+ // The session is created by FailoverConnection::newSession
+ failoverSubscriptionManager = 0;
+}
+
+
+FailoverSession::~FailoverSession ( )
+{
+}
+
+
+framing::FrameSet::shared_ptr
+FailoverSession::get()
+{
+ return session.get();
+}
+
+
+SessionId
+FailoverSession::getId() const
+{
+ return session.getId();
+}
+
+
+void
+FailoverSession::close()
+{
+ session.close();
+}
+
+
+void
+FailoverSession::sync()
+{
+ session.sync();
+}
+
+
+uint32_t
+FailoverSession::timeout(uint32_t /*seconds*/ )
+{
+ // MICK WTF? return session.timeout ( seconds );
+ return 0;
+}
+
+
+Execution&
+FailoverSession::getExecution()
+{
+ return session.getExecution();
+}
+
+
+void
+FailoverSession::flush()
+{
+ session.flush();
+}
+
+
+void
+FailoverSession::markCompleted(const framing::SequenceNumber& id,
+ bool cumulative,
+ bool notifyPeer
+ )
+{
+ session.markCompleted ( id, cumulative, notifyPeer );
+}
+
+
+
+// Wrapped functions from Session ----------------------------
+
+void
+FailoverSession::executionSync()
+{
+ session.executionSync();
+}
+
+
+
+void
+FailoverSession::executionResult ( const SequenceNumber& commandId,
+ const string& value
+ )
+{
+ session.executionResult ( commandId,
+ value
+ );
+}
+
+
+
+void
+FailoverSession::executionException ( uint16_t errorCode,
+ const SequenceNumber& commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const string& description,
+ const FieldTable& errorInfo
+ )
+{
+ session.executionException ( errorCode,
+ commandId,
+ classCode,
+ commandCode,
+ fieldIndex,
+ description,
+ errorInfo
+ );
+}
+
+
+
+void
+FailoverSession::messageTransfer ( const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ const MethodContent& content
+ )
+{
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
+}
+
+
+
+void
+FailoverSession::messageAccept ( const SequenceSet& transfers )
+{
+ session.messageAccept ( transfers );
+}
+
+
+
+void
+FailoverSession::messageReject ( const SequenceSet& transfers,
+ uint16_t code,
+ const string& text
+ )
+{
+ session.messageReject ( transfers,
+ code,
+ text
+ );
+}
+
+
+
+void
+FailoverSession::messageRelease ( const SequenceSet& transfers,
+ bool setRedelivered
+ )
+{
+ session.messageRelease ( transfers,
+ setRedelivered
+ );
+}
+
+
+
+qpid::framing::MessageAcquireResult
+FailoverSession::messageAcquire ( const SequenceSet& transfers )
+{
+ return session.messageAcquire ( transfers );
+}
+
+
+
+qpid::framing::MessageResumeResult
+FailoverSession::messageResume ( const string& destination,
+ const string& resumeId
+ )
+{
+ return session.messageResume ( destination,
+ resumeId
+ );
+}
+
+
+
+void
+FailoverSession::messageSubscribe ( const string& queue,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const FieldTable& arguments
+ )
+{
+ session.messageSubscribe ( queue,
+ destination,
+ acceptMode,
+ acquireMode,
+ exclusive,
+ resumeId,
+ resumeTtl,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::messageCancel ( const string& destinations )
+{
+ session.messageCancel ( destinations );
+}
+
+
+
+void
+FailoverSession::messageSetFlowMode ( const string& destination,
+ uint8_t flowMode
+ )
+{
+ session.messageSetFlowMode ( destination,
+ flowMode
+ );
+}
+
+
+
+void
+FailoverSession::messageFlow(const string& destination,
+ uint8_t unit,
+ uint32_t value)
+{
+ session.messageFlow ( destination,
+ unit,
+ value
+ );
+}
+
+
+
+void
+FailoverSession::messageFlush(const string& destination)
+{
+ session.messageFlush ( destination );
+}
+
+
+
+void
+FailoverSession::messageStop(const string& destination)
+{
+ session.messageStop ( destination );
+}
+
+
+
+void
+FailoverSession::txSelect()
+{
+ session.txSelect ( );
+}
+
+
+
+void
+FailoverSession::txCommit()
+{
+ session.txCommit ( );
+}
+
+
+
+void
+FailoverSession::txRollback()
+{
+ session.txRollback ( );
+}
+
+
+
+void
+FailoverSession::dtxSelect()
+{
+ session.dtxSelect ( );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxStart(const Xid& xid,
+ bool join,
+ bool resume)
+{
+ return session.dtxStart ( xid,
+ join,
+ resume
+ );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxEnd(const Xid& xid,
+ bool fail,
+ bool suspend)
+{
+ return session.dtxEnd ( xid,
+ fail,
+ suspend
+ );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxCommit(const Xid& xid,
+ bool onePhase)
+{
+ return session.dtxCommit ( xid,
+ onePhase
+ );
+}
+
+
+
+void
+FailoverSession::dtxForget(const Xid& xid)
+{
+ session.dtxForget ( xid );
+}
+
+
+
+qpid::framing::DtxGetTimeoutResult
+FailoverSession::dtxGetTimeout(const Xid& xid)
+{
+ return session.dtxGetTimeout ( xid );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxPrepare(const Xid& xid)
+{
+ return session.dtxPrepare ( xid );
+}
+
+
+
+qpid::framing::DtxRecoverResult
+FailoverSession::dtxRecover()
+{
+ return session.dtxRecover ( );
+}
+
+
+
+qpid::framing::XaResult
+FailoverSession::dtxRollback(const Xid& xid)
+{
+ return session.dtxRollback ( xid );
+}
+
+
+
+void
+FailoverSession::dtxSetTimeout(const Xid& xid,
+ uint32_t timeout)
+{
+ session.dtxSetTimeout ( xid,
+ timeout
+ );
+}
+
+
+
+void
+FailoverSession::exchangeDeclare(const string& exchange,
+ const string& type,
+ const string& alternateExchange,
+ bool passive,
+ bool durable,
+ bool autoDelete,
+ const FieldTable& arguments)
+{
+ session.exchangeDeclare ( exchange,
+ type,
+ alternateExchange,
+ passive,
+ durable,
+ autoDelete,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::exchangeDelete(const string& exchange,
+ bool ifUnused)
+{
+ session.exchangeDelete ( exchange,
+ ifUnused
+ );
+}
+
+
+
+qpid::framing::ExchangeQueryResult
+FailoverSession::exchangeQuery(const string& name)
+{
+ return session.exchangeQuery ( name );
+}
+
+
+
+void
+FailoverSession::exchangeBind(const string& queue,
+ const string& exchange,
+ const string& bindingKey,
+ const FieldTable& arguments)
+{
+ session.exchangeBind ( queue,
+ exchange,
+ bindingKey,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::exchangeUnbind(const string& queue,
+ const string& exchange,
+ const string& bindingKey)
+{
+ session.exchangeUnbind ( queue,
+ exchange,
+ bindingKey
+ );
+}
+
+
+
+qpid::framing::ExchangeBoundResult
+FailoverSession::exchangeBound(const string& exchange,
+ const string& queue,
+ const string& bindingKey,
+ const FieldTable& arguments)
+{
+ return session.exchangeBound ( exchange,
+ queue,
+ bindingKey,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::queueDeclare(const string& queue,
+ const string& alternateExchange,
+ bool passive,
+ bool durable,
+ bool exclusive,
+ bool autoDelete,
+ const FieldTable& arguments)
+{
+ session.queueDeclare ( queue,
+ alternateExchange,
+ passive,
+ durable,
+ exclusive,
+ autoDelete,
+ arguments
+ );
+}
+
+
+
+void
+FailoverSession::queueDelete(const string& queue,
+ bool ifUnused,
+ bool ifEmpty)
+{
+ session.queueDelete ( queue,
+ ifUnused,
+ ifEmpty
+ );
+}
+
+
+
+void
+FailoverSession::queuePurge(const string& queue)
+{
+ session.queuePurge ( queue) ;
+}
+
+
+
+qpid::framing::QueueQueryResult
+FailoverSession::queueQuery(const string& queue)
+{
+ return session.queueQuery ( queue );
+}
+
+
+// end Wrapped functions from Session ---------------------------
+
+
+// Get ready for a failover.
+void
+FailoverSession::prepareForFailover ( Connection newConnection )
+{
+ try
+ {
+ newSession = newConnection.newSession();
+ }
+ catch ( const std::exception& error )
+ {
+ throw Exception(QPID_MSG("Can't create failover session."));
+ }
+
+
+ if ( failoverSubscriptionManager )
+ {
+ failoverSubscriptionManager->prepareForFailover ( newSession );
+ }
+}
+
+
+
+void
+FailoverSession::failover ( )
+{
+ if ( failoverSubscriptionManager )
+ {
+ failoverSubscriptionManager->failover ( );
+ }
+
+ session = newSession;
+}
+
+
+
+
+}} // namespace qpid::client