/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/SessionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" #include namespace qpid { namespace amqp_0_10 { using namespace framing; using namespace std; void SessionHandler::checkAttached() { if (!getState()) throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); } SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) : channel(ch, out), peer(channel), awaitingDetached(false), sendReady(), receiveReady() {} SessionHandler::~SessionHandler() {} namespace { bool isSessionControl(AMQMethodBody* m) { return m && m->amqpClassId() == SESSION_CLASS_ID; } session::DetachCode convert(uint8_t code) { switch(code) { case 0: return session::DETACH_CODE_NORMAL; case 1: return session::DETACH_CODE_SESSION_BUSY; case 2: return session::DETACH_CODE_TRANSPORT_BUSY; case 3: return session::DETACH_CODE_NOT_ATTACHED; case 4: default: return session::DETACH_CODE_UNKNOWN_IDS; } } } // namespace void SessionHandler::invoke(const AMQMethodBody& m) { framing::invoke(*this, m); } void SessionHandler::handleIn(AMQFrame& f) { // Note on channel states: a channel is attached if session != 0 AMQMethodBody* m = f.getBody()->getMethod(); try { // Ignore all but detach controls while awaiting detach if (awaitingDetached) { if (!isSessionControl(m)) return; if (m->amqpMethodId() != SESSION_DETACH_METHOD_ID && m->amqpMethodId() != SESSION_DETACHED_METHOD_ID) return; } if (isSessionControl(m)) { invoke(*m); } else { // Drop frames if we are detached. if (!getState()) return; if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) return; // Ignore duplicates. if (getState()->receiverNeedKnownCompleted()) sendCompletion(); getInHandler()->handle(f); } } catch(const SessionException& e) { QPID_LOG(error, "Execution exception: " << e.what()); executionException(e.code, e.what()); // Let subclass handle this first. framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; if (getState()) commandId = getState()->receiverGetCurrent(); execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable()); detaching(); sendDetach(); } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); channelException(e.code, e.what()); // Let subclass handle this first. peer.detached(name, e.code); } catch(const ConnectionException& e) { QPID_LOG(error, "Connection exception: " << e.what()); connectionException(e.code, e.getMessage()); } catch(const std::exception& e) { QPID_LOG(error, "Unexpected exception: " << e.what()); connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } } void SessionHandler::handleException(const qpid::SessionException& e) { QPID_LOG(error, "Execution exception (during output): " << e.what()); executionException(e.code, e.what()); // Let subclass handle this first. framing::AMQP_AllProxy::Execution execution(channel); execution.exception(e.code, 0, 0, 0, 0, e.what(), FieldTable()); detaching(); sendDetach(); } namespace { bool isCommand(const AMQFrame& f) { return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND; } } // namespace void SessionHandler::handleOut(AMQFrame& f) { checkAttached(); if (!sendReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); getState()->senderRecord(f); if (isCommand(f) && getState()->senderNeedFlush()) { peer.flush(false, false, true); getState()->senderRecordFlush(); } channel.handle(f); } void SessionHandler::attach(const std::string& name_, bool force) { // Save the name for possible session-busy exception. Session-busy // can be thrown before we have attached the handler to a valid // SessionState, and in that case we need the name to send peer.detached name = name_; if (getState() && name == getState()->getId().getName()) return; // Idempotent if (getState()) throw TransportBusyException( QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); setState(name, force); QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); peer.attached(name); if (getState()->hasState()) peer.flush(true, true, true); else sendCommandPoint(getState()->senderGetCommandPoint()); } #define CHECK_NAME(NAME, MSG) do { \ checkAttached(); \ if (NAME != getState()->getId().getName()) \ throw InvalidArgumentException( \ QPID_MSG(MSG << ": incorrect session name: " << NAME \ << ", expecting: " << getState()->getId().getName())); \ } while(0) void SessionHandler::attached(const std::string& name) { CHECK_NAME(name, "session.attached"); } void SessionHandler::detach(const std::string& name) { CHECK_NAME(name, "session.detach"); peer.detached(name, session::DETACH_CODE_NORMAL); handleDetach(); } void SessionHandler::detached(const std::string& /*name*/, uint8_t code) { // Special case for detached: Don't check if we are // attached. Checking can lead to an endless game of "detached // tennis" on federated brokers. awaitingDetached = false; if (code != session::DETACH_CODE_NORMAL) { sendReady = receiveReady = false; channelException(convert(code), "session.detached from peer."); } else { handleDetach(); } } void SessionHandler::handleDetach() { sendReady = receiveReady = false; } void SessionHandler::requestTimeout(uint32_t t) { checkAttached(); getState()->setTimeout(t); peer.timeout(getState()->getTimeout()); } void SessionHandler::timeout(uint32_t t) { checkAttached(); getState()->setTimeout(t); } void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { checkAttached(); getState()->receiverSetCommandPoint(SessionPoint(id, offset)); if (!receiveReady) { receiveReady = true; readyToReceive(); } } void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { checkAttached(); if (getState()->hasState()) { // Replay if (commands.empty()) throw IllegalStateException( QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); // TODO aconway 2008-05-12: support replay of partial commands. // Here we always round down to the last command boundary. SessionPoint expectedPoint = commands.empty() ? SequenceNumber(0) : SessionPoint(commands.front(),0); SessionState::ReplayRange replay = getState()->senderExpected(expectedPoint); sendCommandPoint(expectedPoint); std::for_each(replay.begin(), replay.end(), out); // replay } else sendCommandPoint(getState()->senderGetCommandPoint()); } void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { checkAttached(); // Ignore non-contiguous confirmations. if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint()) getState()->senderConfirmed(commands.rangesBegin()->last()); } void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { checkAttached(); getState()->senderCompleted(commands); if (getState()->senderNeedKnownCompleted() || timelyReply) { peer.knownCompleted(commands); getState()->senderRecordKnownCompleted(); } } void SessionHandler::knownCompleted(const SequenceSet& commands) { checkAttached(); getState()->receiverKnownCompleted(commands); } void SessionHandler::flush(bool expected, bool confirmed, bool completed) { checkAttached(); if (expected) { SequenceSet expectSet; if (getState()->hasState()) expectSet.add(getState()->receiverGetExpected().command); peer.expected(expectSet, Array()); } if (confirmed) { SequenceSet confirmSet; if (!getState()->receiverGetUnknownComplete().empty()) confirmSet.add(getState()->receiverGetUnknownComplete().front(), getState()->receiverGetReceived().command); peer.confirmed(confirmSet, Array()); } if (completed) peer.completed(getState()->receiverGetUnknownComplete(), true); } void SessionHandler::gap(const SequenceSet& /*commands*/) { throw NotImplementedException("session.gap not supported"); } void SessionHandler::sendDetach() { checkAttached(); awaitingDetached = true; peer.detach(getState()->getId().getName()); } void SessionHandler::sendCompletion() { checkAttached(); const SequenceSet& c = getState()->receiverGetUnknownComplete(); peer.completed(c, getState()->receiverNeedKnownCompleted()); } void SessionHandler::sendAttach(bool force) { QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) peer.flush(true, true, true); else sendCommandPoint(getState()->senderGetCommandPoint()); } void SessionHandler::sendCommandPoint(const SessionPoint& point) { peer.commandPoint(point.command, point.offset); if (!sendReady) { sendReady = true; readyToSend(); } } void SessionHandler::markReadyToSend() { if (!sendReady) { sendReady = true; } } void SessionHandler::sendTimeout(uint32_t t) { checkAttached(); peer.requestTimeout(t); } void SessionHandler::sendFlush() { peer.flush(false, true, true); } bool SessionHandler::ready() const { return sendReady && receiveReady; } }} // namespace qpid::broker