diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
| commit | 0a1b3430450f274aee273a9f792a2d43f771b85f (patch) | |
| tree | 71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src/qpid/client | |
| parent | e00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff) | |
| download | qpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz | |
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/AckMode.h | 83 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ChannelHandler.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ClientChannel.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Correlator.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Correlator.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FutureResponse.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FutureResponse.h | 2 |
10 files changed, 76 insertions, 109 deletions
diff --git a/cpp/src/qpid/client/AckMode.h b/cpp/src/qpid/client/AckMode.h index 9ad5ef925c..f565c1d36b 100644 --- a/cpp/src/qpid/client/AckMode.h +++ b/cpp/src/qpid/client/AckMode.h @@ -1,72 +1,25 @@ #ifndef _client_AckMode_h #define _client_AckMode_h -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace client { - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - -}} // namespace qpid::client - - - -#endif /*!_client_AckMode_h*/ -#ifndef _client_AckMode_h -#define _client_AckMode_h /* * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ @@ -97,6 +50,4 @@ enum AckMode { }} // namespace qpid::client - - -#endif /*!_client_AckMode_h*/ +#endif diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index c9b7a68f38..49e7285a47 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -58,7 +58,7 @@ void ChannelHandler::incoming(AMQFrame& frame) if (body->getMethod()) handleMethod(body->getMethod()); else - throw new ConnectionException(504, "Channel not open."); + throw ConnectionException(504, "Channel not open for content."); } } @@ -68,7 +68,7 @@ void ChannelHandler::outgoing(AMQFrame& frame) frame.setChannel(id); out(frame); } else if (getState() == CLOSED) { - throw Exception("Channel not open"); + throw Exception(QPID_MSG("Channel not open, can't send " << frame)); } else if (getState() == CLOSED_BY_PEER) { throw ChannelException(code, text); } @@ -120,7 +120,7 @@ void ChannelHandler::handleMethod(AMQMethodBody* method) } //else just ignore it break; case CLOSED: - throw ConnectionException(504, "Channel not opened."); + throw ConnectionException(504, "Channel is closed."); default: throw Exception("Unexpected state encountered in ChannelHandler!"); } diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 87062e1470..0a85b8e4f0 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -69,7 +69,7 @@ Channel::~Channel() void Channel::open(const Session& s) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); active = true; @@ -80,7 +80,7 @@ void Channel::open(const Session& s) } bool Channel::isOpen() const { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); return active; } @@ -146,7 +146,7 @@ void Channel::consume( Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; - c.lastDeliveryTag = 0; + c.count = 0; } uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; ScopedSync s(session, synch); @@ -205,7 +205,7 @@ void Channel::close() { session.close(); { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); active = false; } stop(); @@ -231,20 +231,18 @@ void Channel::join() { void Channel::dispatch(FrameSet& content, const std::string& destination) { - MessageListener* listener(0); - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { - Message msg; - msg.populate(content); - listener = i->second.listener; - } - } - if (listener) { + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { Message msg; msg.populate(content); + MessageListener* listener = i->second.listener; listener->received(msg); + if (isOpen() && i->second.ackMode != CLIENT_ACK) { + bool send = i->second.ackMode == AUTO_ACK + || (prefetch && ++(i->second.count) > (prefetch / 2)); + if (send) i->second.count = 0; + session.execution().completed(content.getId(), true, send); + } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); } diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index b33af65d21..527f5d418f 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -63,8 +63,7 @@ class Channel : private sys::Runnable struct Consumer{ MessageListener* listener; AckMode ackMode; - int count; - u_int64_t lastDeliveryTag; + uint32_t count; }; typedef std::map<std::string, Consumer> ConsumerMap; @@ -75,7 +74,7 @@ class Channel : private sys::Runnable const bool transactional; framing::ProtocolVersion version; - sys::Mutex stopLock; + mutable sys::Mutex stopLock; bool running; ConsumerMap consumers; diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp index 9ef6857957..f30c92b992 100644 --- a/cpp/src/qpid/client/Correlator.cpp +++ b/cpp/src/qpid/client/Correlator.cpp @@ -25,14 +25,15 @@ using qpid::client::Correlator; using namespace qpid::framing; using namespace boost; -void Correlator::receive(AMQMethodBody* response) +bool Correlator::receive(const AMQMethodBody* response) { if (listeners.empty()) { - throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name + return false; } else { Listener l = listeners.front(); if (l) l(response); listeners.pop(); + return true; } } diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h index d93e7b66cd..45b22fb2fe 100644 --- a/cpp/src/qpid/client/Correlator.h +++ b/cpp/src/qpid/client/Correlator.h @@ -36,9 +36,9 @@ namespace client { class Correlator { public: - typedef boost::function<void(framing::AMQMethodBody*)> Listener; + typedef boost::function<void(const framing::AMQMethodBody*)> Listener; - void receive(framing::AMQMethodBody*); + bool receive(const framing::AMQMethodBody*); void listen(Listener l); private: diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 8ea2cc64e6..95cdc7032a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -62,19 +62,16 @@ void ExecutionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (!invoke(body, this)) { - if (isContentFrame(frame)) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); - } - arriving->append(frame); - if (arriving->isComplete()) { + if (!arriving) { + arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); + } + arriving->append(frame); + if (arriving->isComplete()) { + if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { demux.handle(arriving); - arriving.reset(); - } - } else { - ++incoming.hwm; - correlation.receive(body->getMethod()); - } + } + arriving.reset(); + } } } @@ -168,11 +165,19 @@ void ExecutionHandler::sendCompletion() SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) { + return send(command, l, false); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) +{ SequenceNumber id = ++outgoing.hwm; if(l) { completion.listenForResult(id, l); } AMQFrame frame(0/*channel will be filled in be channel handler*/, command); + if (hasContent) { + frame.setEof(false); + } out(frame); return id; } @@ -180,7 +185,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, CompletionTracker::ResultListener l) { - SequenceNumber id = send(command, l); + SequenceNumber id = send(command, l, true); sendContent(content); return id; } @@ -188,14 +193,16 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten void ExecutionHandler::sendContent(const MethodContent& content) { AMQFrame header(0, content.getHeader()); - out(header); - + header.setBof(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ + header.setEof(false); + out(header); //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ AMQFrame frame(0, AMQContentBody(content.getData())); + frame.setBof(false); out(frame); }else{ u_int32_t offset = 0; @@ -204,10 +211,20 @@ void ExecutionHandler::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); - out(frame); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } offset += length; remaining = data_length - offset; + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } + out(frame); } } + } else { + out(header); } } diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 88424b555a..5f9cdff9d2 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -59,6 +59,7 @@ class ExecutionHandler : void sendCompletion(); + framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent); void sendContent(const framing::MethodContent&); public: diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index afdd35c5eb..73b7c3a7a6 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -35,7 +35,7 @@ AMQMethodBody* FutureResponse::getResponse(SessionCore& session) return response.get(); } -void FutureResponse::received(AMQMethodBody* r) +void FutureResponse::received(const AMQMethodBody* r) { Monitor::ScopedLock l(lock); response = *r; diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index 1e8a7eb456..df3b7c6f30 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -36,7 +36,7 @@ class FutureResponse : public FutureCompletion framing::MethodHolder response; public: framing::AMQMethodBody* getResponse(SessionCore& session); - void received(framing::AMQMethodBody* response); + void received(const framing::AMQMethodBody* response); }; }} |
