diff options
| author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
| commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
| tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/framing | |
| parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
| download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz | |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
22 files changed, 418 insertions, 233 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index abd33c4158..423af06173 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -20,9 +20,9 @@ */ #include "AMQFrame.h" -#include "qpid/QpidError.h" #include "qpid/framing/variant.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> @@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t flags = buffer.getOctet(); uint8_t framing_version = (flags & 0xc0) >> 6; if (framing_version != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported"); + throw SyntaxErrorException(QPID_MSG("Framing version unsupported")); bof = flags & 0x08; eof = flags & 0x04; bos = flags & 0x02; @@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t type = buffer.getOctet(); uint16_t frame_size = buffer.getShort(); if (frame_size < frameOverhead()-1) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small"); + throw SyntaxErrorException(QPID_MSG("Frame size too small")); uint8_t reserved1 = buffer.getOctet(); uint8_t field1 = buffer.getOctet(); subchannel = field1 & 0x0f; @@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer) // Verify that the protocol header meets current spec // TODO: should we check reserved2 against zero as well? - the spec isn't clear if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero"); + throw SyntaxErrorException(QPID_MSG("Reserved bits not zero")); // TODO: should no longer care about body size and only pass up B,E,b,e flags uint16_t body_size = frame_size + 1 - frameOverhead(); @@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t end = buffer.getOctet(); if (end != 0xCE) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + throw SyntaxErrorException(QPID_MSG("Frame end not found")); return true; } @@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); + throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type)); } boost::apply_visitor(DecodeVisitor(buffer,size), body); } diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index 53a01141c1..fb84be7cd6 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -18,14 +18,13 @@ * under the License. * */ -#include "qpid/QpidError.h" #include "BodyHandler.h" #include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" - #include <boost/cast.hpp> +#include "qpid/framing/reply_exceptions.h" using namespace qpid::framing; using namespace boost; @@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) { handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: - QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); + throw SyntaxErrorException( + QPID_MSG("Invalid frame type " << body->type())); } } diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 6a466fdfab..8c1a4e1e9e 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -15,8 +15,6 @@ * limitations under the License. * */ -#include <boost/format.hpp> - #include "ChannelAdapter.h" #include "OutputHandler.h" #include "AMQFrame.h" @@ -26,8 +24,6 @@ #include "AMQMethodBody.h" #include "qpid/framing/ConnectionOpenBody.h" -using boost::format; - namespace qpid { namespace framing { @@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body) void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID) - throw ConnectionException( - 504, format("Connection method on non-0 channel %d.")%getId()); + throw ChannelErrorException( + QPID_MSG("Connection method on non-0 channel " << getId())); } void ChannelAdapter::assertChannelOpen() const { if (getId() != 0 && !isOpen()) - throw ConnectionException( - 504, format("Channel %d is not open.")%getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is not open.")); } void ChannelAdapter::assertChannelNotOpen() const { if (getId() != 0 && isOpen()) - throw ConnectionException( - 504, format("Channel %d is already open.") % getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is already open.")); } void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); } diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h index bd16804470..69aaeac492 100644 --- a/cpp/src/qpid/framing/ProtocolVersionException.h +++ b/cpp/src/qpid/framing/ChannelHandler.h @@ -1,3 +1,6 @@ +#ifndef QPID_FRAMING_CHANNELHANDLER_H +#define QPID_FRAMING_CHANNELHANDLER_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,39 +21,33 @@ * under the License. * */ - -#ifndef _ProtocolVersionException_ -#define _ProtocolVersionException_ - -#include "qpid/Exception.h" -#include "ProtocolVersion.h" -#include <string> -#include <vector> +#include "FrameHandler.h" +#include "AMQFrame.h" namespace qpid { namespace framing { -class ProtocolVersionException : public qpid::Exception +/** + * Sets the channel number on outgoing frames. + */ +class ChannelHandler : public FrameHandler { -protected: - ProtocolVersion versionFound; - -public: - ~ProtocolVersionException() throw() {} - - template <class T> - ProtocolVersionException( - ProtocolVersion ver, const T& msg) throw () : versionFound(ver) - { init(boost::lexical_cast<std::string>(msg)); } - - template <class T> - ProtocolVersionException(const T& msg) throw () - { init(boost::lexical_cast<std::string>(msg)); } + public: + ChannelHandler(uint16_t channelId=0, FrameHandler* next=0) + : FrameHandler(next), channel(channelId) {} + void handle(AMQFrame& frame) { + frame.setChannel(channel); + next->handle(frame); + } + uint16_t get() const { return channel; } + ChannelHandler& set(uint16_t ch) { channel=ch; return *this; } + operator uint16_t() const { return get(); } + ChannelHandler& operator=(uint16_t ch) { return set(ch); } private: - void init(const std::string& msg); + uint16_t channel; }; }} // namespace qpid::framing -#endif //ifndef _ProtocolVersionException_ +#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/ diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index 3c0284f2c8..089bc5d4a5 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -19,9 +19,10 @@ * */ #include "FieldTable.h" -#include "qpid/QpidError.h" #include "Buffer.h" #include "FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" #include <assert.h> namespace qpid { @@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){ uint32_t len = buffer.getLong(); uint32_t available = buffer.available(); if (available < len) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table."); + throw SyntaxErrorException(QPID_MSG("Not enough data for field table.")); uint32_t leftover = available - len; while(buffer.available() > leftover){ std::string name; diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index a7535ae4b9..5526c9cb72 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -20,8 +20,7 @@ */ #include "FieldValue.h" #include "Buffer.h" -#include "qpid/QpidError.h" - +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer) data.reset(new FixedWidthValue<0>()); break; default: - std::stringstream out; - out << "Unknown field table value type: " << typeOctet; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); } data->decode(buffer); } diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp index 813e6fb49b..cd134b0e89 100644 --- a/cpp/src/qpid/framing/FramingContent.cpp +++ b/cpp/src/qpid/framing/FramingContent.cpp @@ -18,12 +18,10 @@ * under the License. * */ -#include <assert.h> - #include "Buffer.h" #include "FramingContent.h" -#include "qpid/QpidError.h" -#include <sstream> +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d void Content::validate() { if (discriminator == REFERENCE) { if(value.empty()) { - THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty"); + throw InvalidArgumentException( + QPID_MSG("Reference cannot be empty")); } }else if (discriminator != INLINE) { - std::stringstream out; - out << "Invalid discriminator: " << (int) discriminator; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException( + QPID_MSG("Invalid discriminator: " << discriminator)); } } diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 3e55dff1bd..fbf3c0b7ca 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -33,6 +33,7 @@ template <class T> struct Handler { typedef T HandledType; typedef void handleFptr(T); + typedef void result_type; // Compatible with std/boost functors. Handler(Handler<T>* next_=0) : next(next_) {} virtual ~Handler() {} @@ -51,7 +52,7 @@ struct Handler { struct Chain : public Handler<T> { Chain(Handler<T>* first=0) : Handler(first) {} void operator=(Handler<T>* h) { next = h; } - void handle(T t) { (*next)(t); } + void handle(T t) { next->handle(t); } // TODO aconway 2007-08-29: chain modifier ops here. }; diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp deleted file mode 100644 index b68b3af1f9..0000000000 --- a/cpp/src/qpid/framing/ProtocolVersionException.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> -#include "ProtocolVersionException.h" - - -using namespace qpid::framing; - -void ProtocolVersionException::init(const std::string& msg) -{ - whatStr = boost::str( - boost::format("ProtocolVersionException: %s found: %s") - % versionFound.toString() % msg); -} - diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp deleted file mode 100644 index 9d2c971459..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ResumeHandler.h" -#include "qpid/framing/reply_exceptions.h" - -#include <boost/bind.hpp> - -#include <algorithm> - -namespace qpid { -namespace framing { - -void ResumeHandler::ackReceived(SequenceNumber acked) { - if (lastSent < acked) - throw InvalidArgumentException("Invalid sequence number in ack"); - size_t keep = lastSent - acked; - if (keep < unacked.size()) - unacked.erase(unacked.begin(), unacked.end()-keep); -} - -void ResumeHandler::resend() { - std::for_each(unacked.begin(), unacked.end(), - boost::bind(&FrameHandler::handle,out->next, _1)); -} - -void ResumeHandler::handleIn(AMQFrame& f) { - ++lastReceived; - in.next->handle(f); -} - -void ResumeHandler::handleOut(AMQFrame& f) { - ++lastSent; - unacked.push_back(f); - out.next->handle(f); -} - - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h deleted file mode 100644 index c86a60b9cb..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef QPID_FRAMING_RESUMEHANDLER_H -#define QPID_FRAMING_RESUMEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SequenceNumber.h" - -#include <deque> - -namespace qpid { -namespace framing { - -/** - * In/out handler pair for managing exactly-once session delivery. - * The same handler is used by client and broker. - * This handler only deals with TCP style SequenceNumber acks, - * not with fragmented SequenceNumberSet. - * - * THREAD UNSAFE. Expected to be used in a serialized context. - */ -class ResumeHandler : public FrameHandler::InOutHandler -{ - public: - /** Received acknowledgement for sent frames up to and including sentOk */ - void ackReceived(SequenceNumber sentOk); - - /** What was the last sequence number we received. */ - SequenceNumber getLastReceived() { return lastReceived; } - - /** Resend the unacked frames to the output handler */ - void resend(); - - protected: - void handleIn(AMQFrame&); - void handleOut(AMQFrame&); - - private: - typedef std::deque<AMQFrame> Frames; - Frames unacked; - SequenceNumber lastReceived; - SequenceNumber lastSent; -}; - - -}} // namespace qpid::common - - -#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/ diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index 9b8f0659b2..3aee04a4ce 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -47,6 +47,7 @@ class SequenceNumber bool operator<=(const SequenceNumber& other) const; bool operator>=(const SequenceNumber& other) const; uint32_t getValue() const { return (uint32_t) value; } + operator uint32_t() const { return (uint32_t) value; } friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp new file mode 100644 index 0000000000..045a0ae115 --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +#include <boost/bind.hpp> +#include <boost/none.hpp> + +namespace qpid { +namespace framing { + +SessionState::SessionState(uint32_t ack, const Uuid& uuid) : + state(ATTACHED), + id(uuid), + lastReceived(-1), + lastSent(-1), + ackInterval(ack), + sendAckAt(lastReceived+ackInterval), + solicitAckAt(lastSent+ackInterval), + ackSolicited(false) +{ + assert(ackInterval > 0); +} + +namespace { +bool isSessionCommand(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID; +} +} + +boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { + if (isSessionCommand(f)) + return boost::none; + if (state==RESUMING) + throw CommandInvalidException( + QPID_MSG("Invalid frame: Resuming session, expected session-ack")); + assert(state = ATTACHED); + assert(lastReceived<sendAckAt); + ++lastReceived; + QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); + if (lastReceived == sendAckAt) + return sendingAck(); + else + return boost::none; +} + +bool SessionState::sent(const AMQFrame& f) { + if (isSessionCommand(f)) + return false; + unackedOut.push_back(f); + ++lastSent; + QPID_LOG(trace, "Sent # "<< lastSent << " " << id); + return (state!=RESUMING) && + (lastSent == solicitAckAt) && + sendingSolicit(); +} + +SessionState::Replay SessionState::replay() { + Replay r(unackedOut.size()); + std::copy(unackedOut.begin(), unackedOut.end(), r.begin()); + return r; +} + +void SessionState::receivedAck(SequenceNumber acked) { + if (state==RESUMING) state=ATTACHED; + assert(state==ATTACHED); + if (lastSent < acked) + throw InvalidArgumentException("Invalid sequence number in ack"); + size_t keep = lastSent - acked; + if (keep < unackedOut.size()) + unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep); + solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval)); +} + +SequenceNumber SessionState::sendingAck() { + sendAckAt = lastReceived+ackInterval; + return lastReceived; +} + +bool SessionState::sendingSolicit() { + assert(state == ATTACHED); + if (ackSolicited) + return false; + solicitAckAt = lastSent + ackInterval; + return true; +} + +SequenceNumber SessionState::resuming() { + state = RESUMING; + return sendingAck(); +} + +void SessionState::suspend() { + state = SUSPENDED; +} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h new file mode 100644 index 0000000000..66fc083d3f --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.h @@ -0,0 +1,127 @@ +#ifndef QPID_FRAMING_SESSIONSTATE_H +#define QPID_FRAMING_SESSIONSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/AMQFrame.h" + +#include <boost/optional.hpp> + +#include <deque> + +namespace qpid { +namespace framing { + +/** + * Session state common to client and broker. + * Implements session ack/resume protcools. + * + * A SessionState is always associated with an _open_ session (attached or + * suspended) it is destroyed when the session is closed. + * + * A template to make it protocol independent and easy to test. + */ +class SessionState +{ + public: + typedef std::vector<AMQFrame> Replay; + + /** States of a session. */ + enum State { + SUSPENDED, ///< Suspended, detached from any channel. + RESUMING, ///< Resuming: waiting for initial ack from peer. + ATTACHED ///< Attached to channel and operating normally. + }; + + /** + *Create a newly opened active session. + *@param ackInterval send/solicit an ack whenever N unacked frames + * have been received/sent. + *@pre ackInterval > 0 + */ + SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true)); + + const framing::Uuid& getId() const { return id; } + State getState() const { return state; } + + /** Received incoming L3 frame. + * @return SequenceNumber if an ack should be sent, empty otherwise. + * SessionState assumes that acks are sent whenever it returns + * a seq. number. + */ + boost::optional<SequenceNumber> received(const AMQFrame&); + + /** Sent outgoing L3 frame. + *@return true if solicit-ack should be sent. Note the SessionState + *assumes that a solicit-ack is sent every time it returns true. + */ + bool sent(const AMQFrame&); + + /** Received normal incoming ack. */ + void receivedAck(SequenceNumber); + + /** Frames to replay + *@pre getState()==ATTACHED + */ + Replay replay(); + + /** Suspend the session. */ + void suspend(); + + /** Start resume protocol for the session. + *@returns sequence number to ack immediately. */ + SequenceNumber resuming(); + + /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. + * + * Note: when received() returns a sequence number this function + * should not be called. SessionState assumes that the ack is sent + * every time received() returns a sequence number. + */ + SequenceNumber sendingAck(); + + SequenceNumber getLastSent() const { return lastSent; } + SequenceNumber getLastReceived() const { return lastReceived; } + private: + typedef std::deque<AMQFrame> Unacked; + + bool sendingSolicit(); + + State state; + framing::Uuid id; + Unacked unackedOut; + SequenceNumber lastReceived; + SequenceNumber lastSent; + uint32_t ackInterval; + SequenceNumber sendAckAt; + SequenceNumber solicitAckAt; + bool ackSolicited; + bool suspending; +}; + + +}} // namespace qpid::common + + +#endif /*!QPID_FRAMING_SESSIONSTATE_H*/ diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h new file mode 100644 index 0000000000..8c719e5110 --- /dev/null +++ b/cpp/src/qpid/framing/TemplateVisitor.h @@ -0,0 +1,89 @@ +#ifndef QPID_FRAMING_TEMPLATEVISITOR_H +#define QPID_FRAMING_TEMPLATEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/mpl/fold.hpp> +#include <boost/utility/value_init.hpp> + +namespace qpid { +namespace framing { + +/** + * Metafunction to generate a visitor class derived from Base with a + * visit for each type in TypeList calling functor F. TypeList may be + * any boost::mpl type collection e.g. mpl::list. + * + * Generated class is: TemplateVisitor<Base, F, TypeList>::type + * + * @see make_visitor + */ +template <class VisitTemplate, class TypeList, class F> +class TemplateVisitor +{ + struct Base : public VisitorBase { + F action; + Base(F f) : action(f) {} + using VisitorBase::visit; + }; + + template <class B, class T> struct Visit : public B { + Visit(F action) : B(action) {} + using B::visit; + void visit(const T& body) { action(body); } + }; + + typedef typename boost::mpl::fold< + TypeList, Base, Visit<boost::mpl::placeholders::_1, + boost::mpl::placeholders::_2> + >::type type; +}; + +/** + * Construct a TemplateVisitor to perform the given action, + * for example: + * @code + */ +template <class VisitorBase, class TypeList, class F> +TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) { + return TemplateVisitor<VisitorBase,TypeList,F>::type(action); +}; + +/** + * For method body classes in TypeList, invoke the corresponding function + * on Target and return true. For other body types return false. + */ +template <class TypeList, class Target> +bool invoke(const AMQBody& body, Target& target) { + typename InvokeVisitor<TypeList, Target>::type v(target); + body.accept(v); + return v.target; +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/ + +}} // namespace qpid::framing + + + +#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/ diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index e0372b2f68..1bb69fbca9 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -24,9 +24,13 @@ namespace qpid { namespace framing { -TransferContent::TransferContent(const std::string& _data) +TransferContent::TransferContent(const std::string& data, + const std::string& routingKey, + const std::string& exchange) { - setData(_data); + setData(data); + getDeliveryProperties().setRoutingKey(routingKey); + getDeliveryProperties().setExchange(exchange); } AMQHeaderBody TransferContent::getHeader() const @@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset) const MessageProperties& TransferContent::getMessageProperties() const { const MessageProperties* props = header.get<MessageProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } const DeliveryProperties& TransferContent::getDeliveryProperties() const { const DeliveryProperties* props = header.get<DeliveryProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 6fd96f3587..88f45b7e0a 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -30,14 +30,15 @@ namespace qpid { namespace framing { -struct NoSuchPropertiesException : public Exception {}; - class TransferContent : public MethodContent { AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data = ""); + TransferContent(const std::string& data = std::string(), + const std::string& routingKey = std::string(), + const std::string& exchange = std::string()); + AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index 3a83430d56..2918c48ce3 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -17,9 +17,9 @@ */ #include "Uuid.h" - -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const { void Uuid::decode(Buffer& buf) { if (buf.available() < size()) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID."); + throw SyntaxErrorException(QPID_MSG("Not enough data for UUID.")); buf.getRawData(c_array(), size()); } @@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) { return in; } +std::string Uuid::str() const { + std::ostringstream os; + os << *this; + return os.str(); +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h index 19ae79db6a..9bde67ad8e 100644 --- a/cpp/src/qpid/framing/Uuid.h +++ b/cpp/src/qpid/framing/Uuid.h @@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> { void encode(framing::Buffer& buf) const; void decode(framing::Buffer& buf); + + /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ + std::string str() const; }; /** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index eec28333bc..69b5942ba0 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -32,4 +32,3 @@ #include "ProtocolInitiation.h" #include "BasicHeaderProperties.h" #include "ProtocolVersion.h" -#include "ProtocolVersionException.h" diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h index a788fe36e4..94442aa357 100644 --- a/cpp/src/qpid/framing/amqp_types.h +++ b/cpp/src/qpid/framing/amqp_types.h @@ -61,5 +61,11 @@ class Uuid; const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1; const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); +// Forward declare class types +class FramingContent; +class FieldTable; +class SequenceNumberSet; +class Uuid; + }} // namespace qpid::framing #endif diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h index 3cb8aece5d..1fe81f8f67 100644 --- a/cpp/src/qpid/framing/variant.h +++ b/cpp/src/qpid/framing/variant.h @@ -23,7 +23,6 @@ /**@file Tools for using boost::variant */ -#include "qpid/QpidError.h" #include <boost/variant.hpp> @@ -39,7 +38,7 @@ template <class R=void> struct NoBlankVisitor : public boost::static_visitor<R> { R foundBlank() const { assert(0); - THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value."); + throw Exception(QPID_MSG("Invalid variant value.")); } R operator()(const boost::blank&) const { return foundBlank(); } R operator()(boost::blank&) const { return foundBlank(); } |
