summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/framing
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp14
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp6
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp16
-rw-r--r--cpp/src/qpid/framing/ChannelHandler.h (renamed from cpp/src/qpid/framing/ProtocolVersionException.h)47
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp5
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp7
-rw-r--r--cpp/src/qpid/framing/FramingContent.cpp14
-rw-r--r--cpp/src/qpid/framing/Handler.h3
-rw-r--r--cpp/src/qpid/framing/ProtocolVersionException.cpp33
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.cpp56
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.h69
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h1
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp120
-rw-r--r--cpp/src/qpid/framing/SessionState.h127
-rw-r--r--cpp/src/qpid/framing/TemplateVisitor.h89
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp12
-rw-r--r--cpp/src/qpid/framing/TransferContent.h7
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp12
-rw-r--r--cpp/src/qpid/framing/Uuid.h3
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/framing/amqp_types.h6
-rw-r--r--cpp/src/qpid/framing/variant.h3
22 files changed, 418 insertions, 233 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index abd33c4158..423af06173 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -20,9 +20,9 @@
*/
#include "AMQFrame.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/variant.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
@@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t flags = buffer.getOctet();
uint8_t framing_version = (flags & 0xc0) >> 6;
if (framing_version != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported");
+ throw SyntaxErrorException(QPID_MSG("Framing version unsupported"));
bof = flags & 0x08;
eof = flags & 0x04;
bos = flags & 0x02;
@@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
if (frame_size < frameOverhead()-1)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small");
+ throw SyntaxErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
subchannel = field1 & 0x0f;
@@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer)
// Verify that the protocol header meets current spec
// TODO: should we check reserved2 against zero as well? - the spec isn't clear
if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero");
+ throw SyntaxErrorException(QPID_MSG("Reserved bits not zero"));
// TODO: should no longer care about body size and only pass up B,E,b,e flags
uint16_t body_size = frame_size + 1 - frameOverhead();
@@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t end = buffer.getOctet();
if (end != 0xCE)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ throw SyntaxErrorException(QPID_MSG("Frame end not found"));
return true;
}
@@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
default:
- THROW_QPID_ERROR(
- FRAMING_ERROR,
- boost::format("Unknown frame type %d") % type);
+ throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type));
}
boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index 53a01141c1..fb84be7cd6 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -18,14 +18,13 @@
* under the License.
*
*/
-#include "qpid/QpidError.h"
#include "BodyHandler.h"
#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-
#include <boost/cast.hpp>
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::framing;
using namespace boost;
@@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) {
handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
- QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid frame type " << body->type()));
}
}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 6a466fdfab..8c1a4e1e9e 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -15,8 +15,6 @@
* limitations under the License.
*
*/
-#include <boost/format.hpp>
-
#include "ChannelAdapter.h"
#include "OutputHandler.h"
#include "AMQFrame.h"
@@ -26,8 +24,6 @@
#include "AMQMethodBody.h"
#include "qpid/framing/ConnectionOpenBody.h"
-using boost::format;
-
namespace qpid {
namespace framing {
@@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body)
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID)
- throw ConnectionException(
- 504, format("Connection method on non-0 channel %d.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Connection method on non-0 channel " << getId()));
}
void ChannelAdapter::assertChannelOpen() const {
if (getId() != 0 && !isOpen())
- throw ConnectionException(
- 504, format("Channel %d is not open.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is not open."));
}
void ChannelAdapter::assertChannelNotOpen() const {
if (getId() != 0 && isOpen())
- throw ConnectionException(
- 504, format("Channel %d is already open.") % getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is already open."));
}
void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); }
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h
index bd16804470..69aaeac492 100644
--- a/cpp/src/qpid/framing/ProtocolVersionException.h
+++ b/cpp/src/qpid/framing/ChannelHandler.h
@@ -1,3 +1,6 @@
+#ifndef QPID_FRAMING_CHANNELHANDLER_H
+#define QPID_FRAMING_CHANNELHANDLER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,39 +21,33 @@
* under the License.
*
*/
-
-#ifndef _ProtocolVersionException_
-#define _ProtocolVersionException_
-
-#include "qpid/Exception.h"
-#include "ProtocolVersion.h"
-#include <string>
-#include <vector>
+#include "FrameHandler.h"
+#include "AMQFrame.h"
namespace qpid {
namespace framing {
-class ProtocolVersionException : public qpid::Exception
+/**
+ * Sets the channel number on outgoing frames.
+ */
+class ChannelHandler : public FrameHandler
{
-protected:
- ProtocolVersion versionFound;
-
-public:
- ~ProtocolVersionException() throw() {}
-
- template <class T>
- ProtocolVersionException(
- ProtocolVersion ver, const T& msg) throw () : versionFound(ver)
- { init(boost::lexical_cast<std::string>(msg)); }
-
- template <class T>
- ProtocolVersionException(const T& msg) throw ()
- { init(boost::lexical_cast<std::string>(msg)); }
+ public:
+ ChannelHandler(uint16_t channelId=0, FrameHandler* next=0)
+ : FrameHandler(next), channel(channelId) {}
+ void handle(AMQFrame& frame) {
+ frame.setChannel(channel);
+ next->handle(frame);
+ }
+ uint16_t get() const { return channel; }
+ ChannelHandler& set(uint16_t ch) { channel=ch; return *this; }
+ operator uint16_t() const { return get(); }
+ ChannelHandler& operator=(uint16_t ch) { return set(ch); }
private:
- void init(const std::string& msg);
+ uint16_t channel;
};
}} // namespace qpid::framing
-#endif //ifndef _ProtocolVersionException_
+#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 3c0284f2c8..089bc5d4a5 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -19,9 +19,10 @@
*
*/
#include "FieldTable.h"
-#include "qpid/QpidError.h"
#include "Buffer.h"
#include "FieldValue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
#include <assert.h>
namespace qpid {
@@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
if (available < len)
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
while(buffer.available() > leftover){
std::string name;
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index a7535ae4b9..5526c9cb72 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -20,8 +20,7 @@
*/
#include "FieldValue.h"
#include "Buffer.h"
-#include "qpid/QpidError.h"
-
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer)
data.reset(new FixedWidthValue<0>());
break;
default:
- std::stringstream out;
- out << "Unknown field table value type: " << typeOctet;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
}
data->decode(buffer);
}
diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp
index 813e6fb49b..cd134b0e89 100644
--- a/cpp/src/qpid/framing/FramingContent.cpp
+++ b/cpp/src/qpid/framing/FramingContent.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <assert.h>
-
#include "Buffer.h"
#include "FramingContent.h"
-#include "qpid/QpidError.h"
-#include <sstream>
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d
void Content::validate() {
if (discriminator == REFERENCE) {
if(value.empty()) {
- THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty");
+ throw InvalidArgumentException(
+ QPID_MSG("Reference cannot be empty"));
}
}else if (discriminator != INLINE) {
- std::stringstream out;
- out << "Invalid discriminator: " << (int) discriminator;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid discriminator: " << discriminator));
}
}
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 3e55dff1bd..fbf3c0b7ca 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -33,6 +33,7 @@ template <class T>
struct Handler {
typedef T HandledType;
typedef void handleFptr(T);
+ typedef void result_type; // Compatible with std/boost functors.
Handler(Handler<T>* next_=0) : next(next_) {}
virtual ~Handler() {}
@@ -51,7 +52,7 @@ struct Handler {
struct Chain : public Handler<T> {
Chain(Handler<T>* first=0) : Handler(first) {}
void operator=(Handler<T>* h) { next = h; }
- void handle(T t) { (*next)(t); }
+ void handle(T t) { next->handle(t); }
// TODO aconway 2007-08-29: chain modifier ops here.
};
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp
deleted file mode 100644
index b68b3af1f9..0000000000
--- a/cpp/src/qpid/framing/ProtocolVersionException.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/format.hpp>
-#include "ProtocolVersionException.h"
-
-
-using namespace qpid::framing;
-
-void ProtocolVersionException::init(const std::string& msg)
-{
- whatStr = boost::str(
- boost::format("ProtocolVersionException: %s found: %s")
- % versionFound.toString() % msg);
-}
-
diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp
deleted file mode 100644
index 9d2c971459..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ResumeHandler.h"
-#include "qpid/framing/reply_exceptions.h"
-
-#include <boost/bind.hpp>
-
-#include <algorithm>
-
-namespace qpid {
-namespace framing {
-
-void ResumeHandler::ackReceived(SequenceNumber acked) {
- if (lastSent < acked)
- throw InvalidArgumentException("Invalid sequence number in ack");
- size_t keep = lastSent - acked;
- if (keep < unacked.size())
- unacked.erase(unacked.begin(), unacked.end()-keep);
-}
-
-void ResumeHandler::resend() {
- std::for_each(unacked.begin(), unacked.end(),
- boost::bind(&FrameHandler::handle,out->next, _1));
-}
-
-void ResumeHandler::handleIn(AMQFrame& f) {
- ++lastReceived;
- in.next->handle(f);
-}
-
-void ResumeHandler::handleOut(AMQFrame& f) {
- ++lastSent;
- unacked.push_back(f);
- out.next->handle(f);
-}
-
-
-}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h
deleted file mode 100644
index c86a60b9cb..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef QPID_FRAMING_RESUMEHANDLER_H
-#define QPID_FRAMING_RESUMEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-#include <deque>
-
-namespace qpid {
-namespace framing {
-
-/**
- * In/out handler pair for managing exactly-once session delivery.
- * The same handler is used by client and broker.
- * This handler only deals with TCP style SequenceNumber acks,
- * not with fragmented SequenceNumberSet.
- *
- * THREAD UNSAFE. Expected to be used in a serialized context.
- */
-class ResumeHandler : public FrameHandler::InOutHandler
-{
- public:
- /** Received acknowledgement for sent frames up to and including sentOk */
- void ackReceived(SequenceNumber sentOk);
-
- /** What was the last sequence number we received. */
- SequenceNumber getLastReceived() { return lastReceived; }
-
- /** Resend the unacked frames to the output handler */
- void resend();
-
- protected:
- void handleIn(AMQFrame&);
- void handleOut(AMQFrame&);
-
- private:
- typedef std::deque<AMQFrame> Frames;
- Frames unacked;
- SequenceNumber lastReceived;
- SequenceNumber lastSent;
-};
-
-
-}} // namespace qpid::common
-
-
-#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index 9b8f0659b2..3aee04a4ce 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -47,6 +47,7 @@ class SequenceNumber
bool operator<=(const SequenceNumber& other) const;
bool operator>=(const SequenceNumber& other) const;
uint32_t getValue() const { return (uint32_t) value; }
+ operator uint32_t() const { return (uint32_t) value; }
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
new file mode 100644
index 0000000000..045a0ae115
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+#include <boost/bind.hpp>
+#include <boost/none.hpp>
+
+namespace qpid {
+namespace framing {
+
+SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(ack),
+ sendAckAt(lastReceived+ackInterval),
+ solicitAckAt(lastSent+ackInterval),
+ ackSolicited(false)
+{
+ assert(ackInterval > 0);
+}
+
+namespace {
+bool isSessionCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
+}
+}
+
+boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
+ assert(lastReceived<sendAckAt);
+ ++lastReceived;
+ QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
+ if (lastReceived == sendAckAt)
+ return sendingAck();
+ else
+ return boost::none;
+}
+
+bool SessionState::sent(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return false;
+ unackedOut.push_back(f);
+ ++lastSent;
+ QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
+ return (state!=RESUMING) &&
+ (lastSent == solicitAckAt) &&
+ sendingSolicit();
+}
+
+SessionState::Replay SessionState::replay() {
+ Replay r(unackedOut.size());
+ std::copy(unackedOut.begin(), unackedOut.end(), r.begin());
+ return r;
+}
+
+void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
+ if (lastSent < acked)
+ throw InvalidArgumentException("Invalid sequence number in ack");
+ size_t keep = lastSent - acked;
+ if (keep < unackedOut.size())
+ unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep);
+ solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval));
+}
+
+SequenceNumber SessionState::sendingAck() {
+ sendAckAt = lastReceived+ackInterval;
+ return lastReceived;
+}
+
+bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
+ if (ackSolicited)
+ return false;
+ solicitAckAt = lastSent + ackInterval;
+ return true;
+}
+
+SequenceNumber SessionState::resuming() {
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
new file mode 100644
index 0000000000..66fc083d3f
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -0,0 +1,127 @@
+#ifndef QPID_FRAMING_SESSIONSTATE_H
+#define QPID_FRAMING_SESSIONSTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include <boost/optional.hpp>
+
+#include <deque>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Session state common to client and broker.
+ * Implements session ack/resume protcools.
+ *
+ * A SessionState is always associated with an _open_ session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * A template to make it protocol independent and easy to test.
+ */
+class SessionState
+{
+ public:
+ typedef std::vector<AMQFrame> Replay;
+
+ /** States of a session. */
+ enum State {
+ SUSPENDED, ///< Suspended, detached from any channel.
+ RESUMING, ///< Resuming: waiting for initial ack from peer.
+ ATTACHED ///< Attached to channel and operating normally.
+ };
+
+ /**
+ *Create a newly opened active session.
+ *@param ackInterval send/solicit an ack whenever N unacked frames
+ * have been received/sent.
+ *@pre ackInterval > 0
+ */
+ SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true));
+
+ const framing::Uuid& getId() const { return id; }
+ State getState() const { return state; }
+
+ /** Received incoming L3 frame.
+ * @return SequenceNumber if an ack should be sent, empty otherwise.
+ * SessionState assumes that acks are sent whenever it returns
+ * a seq. number.
+ */
+ boost::optional<SequenceNumber> received(const AMQFrame&);
+
+ /** Sent outgoing L3 frame.
+ *@return true if solicit-ack should be sent. Note the SessionState
+ *assumes that a solicit-ack is sent every time it returns true.
+ */
+ bool sent(const AMQFrame&);
+
+ /** Received normal incoming ack. */
+ void receivedAck(SequenceNumber);
+
+ /** Frames to replay
+ *@pre getState()==ATTACHED
+ */
+ Replay replay();
+
+ /** Suspend the session. */
+ void suspend();
+
+ /** Start resume protocol for the session.
+ *@returns sequence number to ack immediately. */
+ SequenceNumber resuming();
+
+ /** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
+ *
+ * Note: when received() returns a sequence number this function
+ * should not be called. SessionState assumes that the ack is sent
+ * every time received() returns a sequence number.
+ */
+ SequenceNumber sendingAck();
+
+ SequenceNumber getLastSent() const { return lastSent; }
+ SequenceNumber getLastReceived() const { return lastReceived; }
+ private:
+ typedef std::deque<AMQFrame> Unacked;
+
+ bool sendingSolicit();
+
+ State state;
+ framing::Uuid id;
+ Unacked unackedOut;
+ SequenceNumber lastReceived;
+ SequenceNumber lastSent;
+ uint32_t ackInterval;
+ SequenceNumber sendAckAt;
+ SequenceNumber solicitAckAt;
+ bool ackSolicited;
+ bool suspending;
+};
+
+
+}} // namespace qpid::common
+
+
+#endif /*!QPID_FRAMING_SESSIONSTATE_H*/
diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h
new file mode 100644
index 0000000000..8c719e5110
--- /dev/null
+++ b/cpp/src/qpid/framing/TemplateVisitor.h
@@ -0,0 +1,89 @@
+#ifndef QPID_FRAMING_TEMPLATEVISITOR_H
+#define QPID_FRAMING_TEMPLATEVISITOR_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/mpl/fold.hpp>
+#include <boost/utility/value_init.hpp>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Metafunction to generate a visitor class derived from Base with a
+ * visit for each type in TypeList calling functor F. TypeList may be
+ * any boost::mpl type collection e.g. mpl::list.
+ *
+ * Generated class is: TemplateVisitor<Base, F, TypeList>::type
+ *
+ * @see make_visitor
+ */
+template <class VisitTemplate, class TypeList, class F>
+class TemplateVisitor
+{
+ struct Base : public VisitorBase {
+ F action;
+ Base(F f) : action(f) {}
+ using VisitorBase::visit;
+ };
+
+ template <class B, class T> struct Visit : public B {
+ Visit(F action) : B(action) {}
+ using B::visit;
+ void visit(const T& body) { action(body); }
+ };
+
+ typedef typename boost::mpl::fold<
+ TypeList, Base, Visit<boost::mpl::placeholders::_1,
+ boost::mpl::placeholders::_2>
+ >::type type;
+};
+
+/**
+ * Construct a TemplateVisitor to perform the given action,
+ * for example:
+ * @code
+ */
+template <class VisitorBase, class TypeList, class F>
+TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) {
+ return TemplateVisitor<VisitorBase,TypeList,F>::type(action);
+};
+
+/**
+ * For method body classes in TypeList, invoke the corresponding function
+ * on Target and return true. For other body types return false.
+ */
+template <class TypeList, class Target>
+bool invoke(const AMQBody& body, Target& target) {
+ typename InvokeVisitor<TypeList, Target>::type v(target);
+ body.accept(v);
+ return v.target;
+}
+
+}} // namespace qpid::framing
+
+
+#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index e0372b2f68..1bb69fbca9 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -24,9 +24,13 @@
namespace qpid {
namespace framing {
-TransferContent::TransferContent(const std::string& _data)
+TransferContent::TransferContent(const std::string& data,
+ const std::string& routingKey,
+ const std::string& exchange)
{
- setData(_data);
+ setData(data);
+ getDeliveryProperties().setRoutingKey(routingKey);
+ getDeliveryProperties().setExchange(exchange);
}
AMQHeaderBody TransferContent::getHeader() const
@@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset)
const MessageProperties& TransferContent::getMessageProperties() const
{
const MessageProperties* props = header.get<MessageProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
const DeliveryProperties& TransferContent::getDeliveryProperties() const
{
const DeliveryProperties* props = header.get<DeliveryProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
index 6fd96f3587..88f45b7e0a 100644
--- a/cpp/src/qpid/framing/TransferContent.h
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -30,14 +30,15 @@
namespace qpid {
namespace framing {
-struct NoSuchPropertiesException : public Exception {};
-
class TransferContent : public MethodContent
{
AMQHeaderBody header;
std::string data;
public:
- TransferContent(const std::string& data = "");
+ TransferContent(const std::string& data = std::string(),
+ const std::string& routingKey = std::string(),
+ const std::string& exchange = std::string());
+
AMQHeaderBody getHeader() const;
void setData(const std::string&);
void appendData(const std::string&);
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index 3a83430d56..2918c48ce3 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -17,9 +17,9 @@
*/
#include "Uuid.h"
-
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for UUID."));
buf.getRawData(c_array(), size());
}
@@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) {
return in;
}
+std::string Uuid::str() const {
+ std::ostringstream os;
+ os << *this;
+ return os.str();
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h
index 19ae79db6a..9bde67ad8e 100644
--- a/cpp/src/qpid/framing/Uuid.h
+++ b/cpp/src/qpid/framing/Uuid.h
@@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> {
void encode(framing::Buffer& buf) const;
void decode(framing::Buffer& buf);
+
+ /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
+ std::string str() const;
};
/** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index eec28333bc..69b5942ba0 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -32,4 +32,3 @@
#include "ProtocolInitiation.h"
#include "BasicHeaderProperties.h"
#include "ProtocolVersion.h"
-#include "ProtocolVersionException.h"
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index a788fe36e4..94442aa357 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -61,5 +61,11 @@ class Uuid;
const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1;
const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX);
+// Forward declare class types
+class FramingContent;
+class FieldTable;
+class SequenceNumberSet;
+class Uuid;
+
}} // namespace qpid::framing
#endif
diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h
index 3cb8aece5d..1fe81f8f67 100644
--- a/cpp/src/qpid/framing/variant.h
+++ b/cpp/src/qpid/framing/variant.h
@@ -23,7 +23,6 @@
/**@file Tools for using boost::variant */
-#include "qpid/QpidError.h"
#include <boost/variant.hpp>
@@ -39,7 +38,7 @@ template <class R=void>
struct NoBlankVisitor : public boost::static_visitor<R> {
R foundBlank() const {
assert(0);
- THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value.");
+ throw Exception(QPID_MSG("Invalid variant value."));
}
R operator()(const boost::blank&) const { return foundBlank(); }
R operator()(boost::blank&) const { return foundBlank(); }