diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
| commit | 0a1b3430450f274aee273a9f792a2d43f771b85f (patch) | |
| tree | 71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src/qpid/broker | |
| parent | e00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff) | |
| download | qpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz | |
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 42 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 18 |
7 files changed, 75 insertions, 47 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index d96622cd4f..29e2256b56 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -126,6 +126,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer* c, bool sync){ if (!c || c->preAcquires()) { if (sync) { + Mutex::ScopedLock locker(messageLock); dispatch(); } else { serializer.execute(dispatchCallback); @@ -153,7 +154,9 @@ bool Queue::dispatch(QueuedMessage& msg){ int start = next; while(c){ next++; - if(c->deliver(msg)) return true; + if(c->deliver(msg)) { + return true; + } next = next % acquirers.size(); c = next == start ? 0 : acquirers[next]; } diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index a8a0745104..619d59f710 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -30,12 +30,15 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, const string _consumerTag, const DeliveryId _id, - bool _acquired) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - id(_id), - acquired(_acquired), - pull(false){} + bool _acquired, bool _confirmed) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + id(_id), + acquired(_acquired), + confirmed(_confirmed), + pull(false) +{ +} DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, @@ -44,11 +47,12 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, consumerTag(""), id(_id), acquired(true), + confirmed(false), pull(true){} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired) { + if (acquired && !confirmed) { queue->dequeue(ctxt, msg.payload); } } @@ -70,24 +74,30 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const } void DeliveryRecord::redeliver(Session* const session) const{ - if(pull){ - //if message was originally sent as response to get, we must requeue it - requeue(); - }else{ - session->deliver(msg.payload, consumerTag, id); + if (!confirmed) { + if(pull){ + //if message was originally sent as response to get, we must requeue it + requeue(); + }else{ + session->deliver(msg.payload, consumerTag, id); + } } } void DeliveryRecord::requeue() const { - msg.payload->redeliver(); - queue->requeue(msg); + if (!confirmed) { + msg.payload->redeliver(); + queue->requeue(msg); + } } void DeliveryRecord::release() { - queue->requeue(msg); - acquired = false; + if (!confirmed) { + queue->requeue(msg); + acquired = false; + } } void DeliveryRecord::reject() diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 3caac6bf40..4d98b0c5da 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -45,11 +45,12 @@ class DeliveryRecord{ const std::string consumerTag; const DeliveryId id; bool acquired; + const bool confirmed; const bool pull; public: DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, - const DeliveryId id, bool acquired); + const DeliveryId id, bool acquired, bool confirmed = false); DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); void dequeue(TransactionContext* ctxt = 0) const; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 84d3478173..39f9f85c13 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -144,7 +144,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t if (isContentReleased()) { //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load? + uint64_t expectedSize(frames.getHeaders()->getContentLength()); for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) { uint64_t remaining = expectedSize - offset; @@ -153,11 +153,22 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t store->loadContent(*this, data, offset, remaining > maxContentSize ? maxContentSize : remaining); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } out.handle(frame); } } else { - SendContent f(out, channel, maxFrameSize); + Count c; + frames.map_if(c, TypeFilter(CONTENT_BODY)); + + SendContent f(out, channel, maxFrameSize, c.getCount()); frames.map_if(f, TypeFilter(CONTENT_BODY)); } } diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index b259aa6b8f..6471245ed9 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -39,7 +39,7 @@ namespace broker{ struct BaseToken : DeliveryToken { virtual ~BaseToken() {} - virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0; + virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0; }; struct BasicGetToken : BaseToken @@ -50,12 +50,11 @@ struct BasicGetToken : BaseToken BasicGetToken(Queue::shared_ptr q) : queue(q) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicGetOkBody( - channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), + return AMQFrame(0, BasicGetOkBody( + ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey(), queue->getMessageCount())); - } }; @@ -67,10 +66,10 @@ struct BasicConsumeToken : BaseToken BasicConsumeToken(const string c) : consumer(c) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicDeliverBody( - channel.getVersion(), consumer, id.getValue(), + return AMQFrame(0, BasicDeliverBody( + ProtocolVersion(), consumer, id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); } @@ -85,16 +84,13 @@ struct MessageDeliveryToken : BaseToken MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : destination(d), confirmMode(c), acquireMode(a) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/) { - //TODO; need to figure out how the acquire mode gets - //communicated (this is just a temporary solution) - channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode)); - //may need to set the redelivered flag: if (msg->getRedelivered()){ msg->getProperties<DeliveryProperties>()->setRedelivered(true); } + return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode)); } }; @@ -127,11 +123,15 @@ void MessageDelivery::deliver(Message::shared_ptr msg, //another may well have the wrong headers; however we will only //have one content class for 0-10 proper + FrameHandler& handler = channel.getHandlers().out; + //send method boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); - t->sendMethod(msg, channel, id); + AMQFrame method = t->sendMethod(msg, id); + method.setEof(false); + method.setChannel(channel.getId()); + handler.handle(method); - FrameHandler& handler = channel.getHandlers().out; msg->sendHeader(handler, channel.getId(), framesize); msg->sendContent(handler, channel.getId(), framesize); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f1bdc68899..ead2fad379 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -172,10 +172,11 @@ bool SemanticHandler::isOpen() const { DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - SequenceNumber copy(outgoing.hwm); - ++copy; - MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); - return outgoing.hwm.getValue(); + //SequenceNumber copy(outgoing.hwm); + //++copy; + MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); + return outgoing.hwm; + //return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index c98fdd6291..d3f82655d0 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -268,8 +268,8 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter->deliver(msg.payload, token); - if (ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire)); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); } } return !blocked; @@ -565,12 +565,14 @@ AckRange Session::findRange(DeliveryId first, DeliveryId last) ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); ack_iterator end = start; - if (first == last) { - //just acked single element (move end past it) - ++end; - } else { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + if (start != unacked.end()) { + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } } return AckRange(start, end); } |
