summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
committerGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
commitb33a63b36c659a894143382d0a61efe6a598fcc6 (patch)
tree0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src/qpid/broker
parent748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff)
downloadqpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.cpp146
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.h72
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxAck.h4
-rw-r--r--cpp/src/qpid/broker/Session.h4
-rw-r--r--cpp/src/qpid/broker/TxAck.cpp1
-rw-r--r--cpp/src/qpid/broker/TxAck.h6
10 files changed, 12 insertions, 230 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp
deleted file mode 100644
index 5603f39410..0000000000
--- a/cpp/src/qpid/broker/AccumulatedAck.cpp
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "AccumulatedAck.h"
-
-#include <assert.h>
-#include <iostream>
-
-using std::list;
-using std::max;
-using std::min;
-using namespace qpid::broker;
-
-void AccumulatedAck::update(DeliveryId first, DeliveryId last){
- assert(first <= last);
- if (last < mark) return;
-
-
- Range r(first, last);
- bool handled = false;
- bool markMerged = false;
- list<Range>::iterator merged = ranges.end();
- if (r.mergeable(mark)) {
- mark = r.end;
- markMerged = true;
- handled = true;
- } else {
- for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) {
- if (i->merge(r)) {
- merged = i;
- handled = true;
- } else if (r.start < i->start) {
- ranges.insert(i, r);
- handled = true;
- }
- }
- }
- if (!handled) {
- ranges.push_back(r);
- } else {
- while (!ranges.empty() && ranges.front().end <= mark) {
- ranges.pop_front();
- }
- if (markMerged) {
- //new range is incorporated, but may be possible to consolidate
- merged = ranges.begin();
- while (merged != ranges.end() && merged->mergeable(mark)) {
- mark = merged->end;
- merged = ranges.erase(merged);
- }
- }
- if (merged != ranges.end()) {
- //consolidate ranges
- list<Range>::iterator i = merged;
- list<Range>::iterator j = i++;
- while (i != ranges.end() && j->merge(*i)) {
- j = i++;
- }
- }
- }
-}
-
-void AccumulatedAck::consolidate(){}
-
-void AccumulatedAck::clear(){
- mark = 0;//not sure that this is valid when wraparound is a possibility
- ranges.clear();
-}
-
-bool AccumulatedAck::covers(DeliveryId tag) const{
- if (tag <= mark) return true;
- for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
- if (i->contains(tag)) return true;
- }
- return false;
-}
-
-bool Range::contains(DeliveryId i) const
-{
- return i >= start && i <= end;
-}
-
-bool Range::intersect(const Range& r) const
-{
- return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end);
-}
-
-bool Range::merge(const Range& r)
-{
- if (intersect(r) || mergeable(r.end) || r.mergeable(end)) {
- start = min(start, r.start);
- end = max(end, r.end);
- return true;
- } else {
- return false;
- }
-}
-
-bool Range::mergeable(const DeliveryId& s) const
-{
- if (contains(s) || start - s == 1) {
- return true;
- } else {
- return false;
- }
-}
-
-Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
-
-
-namespace qpid{
-namespace broker{
- std::ostream& operator<<(std::ostream& out, const Range& r)
- {
- out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
- return out;
- }
-
- std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a)
- {
- out << "{mark: " << a.mark.getValue() << ", ranges: (";
- for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) {
- if (i != a.ranges.begin()) out << ", ";
- out << *i;
- }
- out << ")]";
- return out;
- }
-}}
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h
deleted file mode 100644
index 9c7cc3d887..0000000000
--- a/cpp/src/qpid/broker/AccumulatedAck.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _AccumulatedAck_
-#define _AccumulatedAck_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-#include <ostream>
-#include "DeliveryId.h"
-
-namespace qpid {
- namespace broker {
-
- struct Range
- {
- DeliveryId start;
- DeliveryId end;
-
- Range(DeliveryId s, DeliveryId e);
- bool contains(DeliveryId i) const;
- bool intersect(const Range& r) const;
- bool merge(const Range& r);
- bool mergeable(const DeliveryId& r) const;
- };
- /**
- * Keeps an accumulated record of acked messages (by delivery
- * tag).
- */
- class AccumulatedAck {
- public:
- /**
- * Everything up to this value has been acked.
- */
- DeliveryId mark;
- /**
- * List of individually acked messages greater than the
- * 'mark'.
- */
- std::list<Range> ranges;
-
- explicit AccumulatedAck(DeliveryId r) : mark(r) {}
- void update(DeliveryId firstTag, DeliveryId lastTag);
- void consolidate();
- void clear();
- bool covers(DeliveryId tag) const;
- };
- std::ostream& operator<<(std::ostream&, const Range&);
- std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
- }
-}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 1a44b09188..b3a8e135a3 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -171,7 +171,6 @@ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
queue->getSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
-
}
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 7649715ade..9b33fd5f10 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -65,7 +65,7 @@ bool DeliveryRecord::after(DeliveryId tag) const{
return id > tag;
}
-bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
return range->covers(id);
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 583579ac10..3caac6bf40 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "BrokerQueue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -56,7 +56,7 @@ class DeliveryRecord{
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const AccumulatedAck* const range) const;
+ bool coveredBy(const framing::AccumulatedAck* const range) const;
void requeue() const;
void release();
void reject();
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index badf3564e7..25186b4102 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -26,7 +26,7 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index 84afd00c9c..c61b279c42 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@ namespace qpid {
std::list<DeliveryRecord> pending;
public:
- DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
index 8458f4cabf..6b9d3e9557 100644
--- a/cpp/src/qpid/broker/Session.h
+++ b/cpp/src/qpid/broker/Session.h
@@ -22,7 +22,6 @@
*
*/
-#include "AccumulatedAck.h"
#include "Consumer.h"
#include "Deliverable.h"
#include "DeliveryAdapter.h"
@@ -35,6 +34,7 @@
#include "TxBuffer.h"
#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
@@ -116,7 +116,7 @@ class Session : public framing::FrameHandler::Chains,
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
- AccumulatedAck accumulatedAck;
+ framing::AccumulatedAck accumulatedAck;
bool opened;
bool flowActive;
diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp
index 958dbcbec0..05ea755d71 100644
--- a/cpp/src/qpid/broker/TxAck.cpp
+++ b/cpp/src/qpid/broker/TxAck.cpp
@@ -25,6 +25,7 @@ using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked){
diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h
index 5e6d0a370c..c8383b6314 100644
--- a/cpp/src/qpid/broker/TxAck.h
+++ b/cpp/src/qpid/broker/TxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@ namespace qpid {
* transactional channel.
*/
class TxAck : public TxOp{
- AccumulatedAck& acked;
+ framing::AccumulatedAck& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();