diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
| commit | b33a63b36c659a894143382d0a61efe6a598fcc6 (patch) | |
| tree | 0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src/qpid/framing | |
| parent | 748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff) | |
| download | qpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz | |
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/AMQMethodBody.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.cpp | 154 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.h | 74 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/StructHelper.h | 2 |
4 files changed, 232 insertions, 2 deletions
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index a5c14a37e9..f0043d9d3b 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -50,7 +50,9 @@ class AMQMethodBody : public AMQBody { virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; virtual bool isContentBearing() const = 0; - + virtual bool resultExpected() const = 0; + virtual bool responseExpected() const = 0; + void invoke(AMQP_ServerOperations&); bool invoke(Invocable*); diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp new file mode 100644 index 0000000000..9daae5494c --- /dev/null +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AccumulatedAck.h" + +#include <assert.h> +#include <iostream> + +using std::list; +using std::max; +using std::min; +using namespace qpid::framing; + +void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ + assert(first <= last); + if (last < mark) return; + + + Range r(first, last); + bool handled = false; + bool markMerged = false; + list<Range>::iterator merged = ranges.end(); + if (r.mergeable(mark)) { + mark = r.end; + markMerged = true; + handled = true; + } else { + for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { + if (i->merge(r)) { + merged = i; + handled = true; + } else if (r.start < i->start) { + ranges.insert(i, r); + handled = true; + } + } + } + if (!handled) { + ranges.push_back(r); + } else { + while (!ranges.empty() && ranges.front().end <= mark) { + ranges.pop_front(); + } + if (markMerged) { + //new range is incorporated, but may be possible to consolidate + merged = ranges.begin(); + while (merged != ranges.end() && merged->mergeable(mark)) { + mark = merged->end; + merged = ranges.erase(merged); + } + } + if (merged != ranges.end()) { + //consolidate ranges + list<Range>::iterator i = merged; + list<Range>::iterator j = i++; + while (i != ranges.end() && j->merge(*i)) { + j = i++; + } + } + } +} + +void AccumulatedAck::consolidate(){} + +void AccumulatedAck::clear(){ + mark = 0;//not sure that this is valid when wraparound is a possibility + ranges.clear(); +} + +bool AccumulatedAck::covers(SequenceNumber tag) const{ + if (tag <= mark) return true; + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(tag)) return true; + } + return false; +} + +void AccumulatedAck::collectRanges(SequenceNumberSet& set) const +{ + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + set.push_back(i->start); + set.push_back(i->end); + } +} + +bool Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool Range::intersect(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool Range::merge(const Range& r) +{ + if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + + +namespace qpid{ +namespace framing{ + std::ostream& operator<<(std::ostream& out, const Range& r) + { + out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; + return out; + } + + std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) + { + out << "{mark: " << a.mark.getValue() << ", ranges: ("; + for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { + if (i != a.ranges.begin()) out << ", "; + out << *i; + } + out << ")]"; + return out; + } +}} diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h new file mode 100644 index 0000000000..f75842968f --- /dev/null +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include <ostream> +#include "SequenceNumber.h" +#include "SequenceNumberSet.h" + +namespace qpid { + namespace framing { + + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersect(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + }; + /** + * Keeps an accumulated record of acked messages (by delivery + * tag). + */ + class AccumulatedAck { + public: + /** + * Everything up to this value has been acked. + */ + SequenceNumber mark; + /** + * List of individually acked messages greater than the + * 'mark'. + */ + std::list<Range> ranges; + + explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {} + void update(SequenceNumber firstTag, SequenceNumber lastTag); + void consolidate(); + void clear(); + bool covers(SequenceNumber tag) const; + void collectRanges(SequenceNumberSet& set) const; + }; + std::ostream& operator<<(std::ostream&, const Range&); + std::ostream& operator<<(std::ostream&, const AccumulatedAck&); + } +} + + +#endif diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h index 753a593523..6b111e1f9e 100644 --- a/cpp/src/qpid/framing/StructHelper.h +++ b/cpp/src/qpid/framing/StructHelper.h @@ -44,7 +44,7 @@ public: rbuffer.getRawData(data, size); } - template <class T> void decode(T t, std::string& data) { + template <class T> void decode(T& t, const std::string& data) { char* bytes = static_cast<char*>(::alloca(data.length())); Buffer wbuffer(bytes, data.length()); wbuffer.putRawData(data); |
