summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
committerGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
commit0a1b3430450f274aee273a9f792a2d43f771b85f (patch)
tree71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src/qpid/broker
parente00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff)
downloadqpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp5
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp42
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h3
-rw-r--r--cpp/src/qpid/broker/Message.cpp15
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp30
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp9
-rw-r--r--cpp/src/qpid/broker/Session.cpp18
7 files changed, 75 insertions, 47 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index d96622cd4f..29e2256b56 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -126,6 +126,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
void Queue::requestDispatch(Consumer* c, bool sync){
if (!c || c->preAcquires()) {
if (sync) {
+ Mutex::ScopedLock locker(messageLock);
dispatch();
} else {
serializer.execute(dispatchCallback);
@@ -153,7 +154,9 @@ bool Queue::dispatch(QueuedMessage& msg){
int start = next;
while(c){
next++;
- if(c->deliver(msg)) return true;
+ if(c->deliver(msg)) {
+ return true;
+ }
next = next % acquirers.size();
c = next == start ? 0 : acquirers[next];
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index a8a0745104..619d59f710 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -30,12 +30,15 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
const string _consumerTag,
const DeliveryId _id,
- bool _acquired) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- id(_id),
- acquired(_acquired),
- pull(false){}
+ bool _acquired, bool _confirmed) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ id(_id),
+ acquired(_acquired),
+ confirmed(_confirmed),
+ pull(false)
+{
+}
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
@@ -44,11 +47,12 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
consumerTag(""),
id(_id),
acquired(true),
+ confirmed(false),
pull(true){}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired) {
+ if (acquired && !confirmed) {
queue->dequeue(ctxt, msg.payload);
}
}
@@ -70,24 +74,30 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
}
void DeliveryRecord::redeliver(Session* const session) const{
- if(pull){
- //if message was originally sent as response to get, we must requeue it
- requeue();
- }else{
- session->deliver(msg.payload, consumerTag, id);
+ if (!confirmed) {
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ session->deliver(msg.payload, consumerTag, id);
+ }
}
}
void DeliveryRecord::requeue() const
{
- msg.payload->redeliver();
- queue->requeue(msg);
+ if (!confirmed) {
+ msg.payload->redeliver();
+ queue->requeue(msg);
+ }
}
void DeliveryRecord::release()
{
- queue->requeue(msg);
- acquired = false;
+ if (!confirmed) {
+ queue->requeue(msg);
+ acquired = false;
+ }
}
void DeliveryRecord::reject()
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 3caac6bf40..4d98b0c5da 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -45,11 +45,12 @@ class DeliveryRecord{
const std::string consumerTag;
const DeliveryId id;
bool acquired;
+ const bool confirmed;
const bool pull;
public:
DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
- const DeliveryId id, bool acquired);
+ const DeliveryId id, bool acquired, bool confirmed = false);
DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 84d3478173..39f9f85c13 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -144,7 +144,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load?
+ uint64_t expectedSize(frames.getHeaders()->getContentLength());
for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
{
uint64_t remaining = expectedSize - offset;
@@ -153,11 +153,22 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t
store->loadContent(*this, data, offset,
remaining > maxContentSize ? maxContentSize : remaining);
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
out.handle(frame);
}
} else {
- SendContent f(out, channel, maxFrameSize);
+ Count c;
+ frames.map_if(c, TypeFilter(CONTENT_BODY));
+
+ SendContent f(out, channel, maxFrameSize, c.getCount());
frames.map_if(f, TypeFilter(CONTENT_BODY));
}
}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index b259aa6b8f..6471245ed9 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -39,7 +39,7 @@ namespace broker{
struct BaseToken : DeliveryToken
{
virtual ~BaseToken() {}
- virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+ virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0;
};
struct BasicGetToken : BaseToken
@@ -50,12 +50,11 @@ struct BasicGetToken : BaseToken
BasicGetToken(Queue::shared_ptr q) : queue(q) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicGetOkBody(
- channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+ return AMQFrame(0, BasicGetOkBody(
+ ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey(), queue->getMessageCount()));
-
}
};
@@ -67,10 +66,10 @@ struct BasicConsumeToken : BaseToken
BasicConsumeToken(const string c) : consumer(c) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumer, id.getValue(),
+ return AMQFrame(0, BasicDeliverBody(
+ ProtocolVersion(), consumer, id.getValue(),
msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
}
@@ -85,16 +84,13 @@ struct MessageDeliveryToken : BaseToken
MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
destination(d), confirmMode(c), acquireMode(a) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/)
{
- //TODO; need to figure out how the acquire mode gets
- //communicated (this is just a temporary solution)
- channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
-
//may need to set the redelivered flag:
if (msg->getRedelivered()){
msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode));
}
};
@@ -127,11 +123,15 @@ void MessageDelivery::deliver(Message::shared_ptr msg,
//another may well have the wrong headers; however we will only
//have one content class for 0-10 proper
+ FrameHandler& handler = channel.getHandlers().out;
+
//send method
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
- t->sendMethod(msg, channel, id);
+ AMQFrame method = t->sendMethod(msg, id);
+ method.setEof(false);
+ method.setChannel(channel.getId());
+ handler.handle(method);
- FrameHandler& handler = channel.getHandlers().out;
msg->sendHeader(handler, channel.getId(), framesize);
msg->sendContent(handler, channel.getId(), framesize);
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f1bdc68899..ead2fad379 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -172,10 +172,11 @@ bool SemanticHandler::isOpen() const {
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- SequenceNumber copy(outgoing.hwm);
- ++copy;
- MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
- return outgoing.hwm.getValue();
+ //SequenceNumber copy(outgoing.hwm);
+ //++copy;
+ MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+ return outgoing.hwm;
+ //return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index c98fdd6291..d3f82655d0 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -268,8 +268,8 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
DeliveryId deliveryTag =
parent->deliveryAdapter->deliver(msg.payload, token);
- if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
}
}
return !blocked;
@@ -565,12 +565,14 @@ AckRange Session::findRange(DeliveryId first, DeliveryId last)
ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ if (start != unacked.end()) {
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ }
}
return AckRange(start, end);
}