summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
committerGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
commitb33a63b36c659a894143382d0a61efe6a598fcc6 (patch)
tree0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src/qpid/framing
parent748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff)
downloadqpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h4
-rw-r--r--cpp/src/qpid/framing/AccumulatedAck.cpp154
-rw-r--r--cpp/src/qpid/framing/AccumulatedAck.h74
-rw-r--r--cpp/src/qpid/framing/StructHelper.h2
4 files changed, 232 insertions, 2 deletions
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index a5c14a37e9..f0043d9d3b 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -50,7 +50,9 @@ class AMQMethodBody : public AMQBody {
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
virtual bool isContentBearing() const = 0;
-
+ virtual bool resultExpected() const = 0;
+ virtual bool responseExpected() const = 0;
+
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp
new file mode 100644
index 0000000000..9daae5494c
--- /dev/null
+++ b/cpp/src/qpid/framing/AccumulatedAck.cpp
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AccumulatedAck.h"
+
+#include <assert.h>
+#include <iostream>
+
+using std::list;
+using std::max;
+using std::min;
+using namespace qpid::framing;
+
+void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){
+ assert(first <= last);
+ if (last < mark) return;
+
+
+ Range r(first, last);
+ bool handled = false;
+ bool markMerged = false;
+ list<Range>::iterator merged = ranges.end();
+ if (r.mergeable(mark)) {
+ mark = r.end;
+ markMerged = true;
+ handled = true;
+ } else {
+ for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) {
+ if (i->merge(r)) {
+ merged = i;
+ handled = true;
+ } else if (r.start < i->start) {
+ ranges.insert(i, r);
+ handled = true;
+ }
+ }
+ }
+ if (!handled) {
+ ranges.push_back(r);
+ } else {
+ while (!ranges.empty() && ranges.front().end <= mark) {
+ ranges.pop_front();
+ }
+ if (markMerged) {
+ //new range is incorporated, but may be possible to consolidate
+ merged = ranges.begin();
+ while (merged != ranges.end() && merged->mergeable(mark)) {
+ mark = merged->end;
+ merged = ranges.erase(merged);
+ }
+ }
+ if (merged != ranges.end()) {
+ //consolidate ranges
+ list<Range>::iterator i = merged;
+ list<Range>::iterator j = i++;
+ while (i != ranges.end() && j->merge(*i)) {
+ j = i++;
+ }
+ }
+ }
+}
+
+void AccumulatedAck::consolidate(){}
+
+void AccumulatedAck::clear(){
+ mark = 0;//not sure that this is valid when wraparound is a possibility
+ ranges.clear();
+}
+
+bool AccumulatedAck::covers(SequenceNumber tag) const{
+ if (tag <= mark) return true;
+ for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ if (i->contains(tag)) return true;
+ }
+ return false;
+}
+
+void AccumulatedAck::collectRanges(SequenceNumberSet& set) const
+{
+ for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ set.push_back(i->start);
+ set.push_back(i->end);
+ }
+}
+
+bool Range::contains(SequenceNumber i) const
+{
+ return i >= start && i <= end;
+}
+
+bool Range::intersect(const Range& r) const
+{
+ return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end);
+}
+
+bool Range::merge(const Range& r)
+{
+ if (intersect(r) || mergeable(r.end) || r.mergeable(end)) {
+ start = min(start, r.start);
+ end = max(end, r.end);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool Range::mergeable(const SequenceNumber& s) const
+{
+ if (contains(s) || start - s == 1) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
+
+
+namespace qpid{
+namespace framing{
+ std::ostream& operator<<(std::ostream& out, const Range& r)
+ {
+ out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
+ return out;
+ }
+
+ std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a)
+ {
+ out << "{mark: " << a.mark.getValue() << ", ranges: (";
+ for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) {
+ if (i != a.ranges.begin()) out << ", ";
+ out << *i;
+ }
+ out << ")]";
+ return out;
+ }
+}}
diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h
new file mode 100644
index 0000000000..f75842968f
--- /dev/null
+++ b/cpp/src/qpid/framing/AccumulatedAck.h
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <ostream>
+#include "SequenceNumber.h"
+#include "SequenceNumberSet.h"
+
+namespace qpid {
+ namespace framing {
+
+ struct Range
+ {
+ SequenceNumber start;
+ SequenceNumber end;
+
+ Range(SequenceNumber s, SequenceNumber e);
+ bool contains(SequenceNumber i) const;
+ bool intersect(const Range& r) const;
+ bool merge(const Range& r);
+ bool mergeable(const SequenceNumber& r) const;
+ };
+ /**
+ * Keeps an accumulated record of acked messages (by delivery
+ * tag).
+ */
+ class AccumulatedAck {
+ public:
+ /**
+ * Everything up to this value has been acked.
+ */
+ SequenceNumber mark;
+ /**
+ * List of individually acked messages greater than the
+ * 'mark'.
+ */
+ std::list<Range> ranges;
+
+ explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {}
+ void update(SequenceNumber firstTag, SequenceNumber lastTag);
+ void consolidate();
+ void clear();
+ bool covers(SequenceNumber tag) const;
+ void collectRanges(SequenceNumberSet& set) const;
+ };
+ std::ostream& operator<<(std::ostream&, const Range&);
+ std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h
index 753a593523..6b111e1f9e 100644
--- a/cpp/src/qpid/framing/StructHelper.h
+++ b/cpp/src/qpid/framing/StructHelper.h
@@ -44,7 +44,7 @@ public:
rbuffer.getRawData(data, size);
}
- template <class T> void decode(T t, std::string& data) {
+ template <class T> void decode(T& t, const std::string& data) {
char* bytes = static_cast<char*>(::alloca(data.length()));
Buffer wbuffer(bytes, data.length());
wbuffer.putRawData(data);