summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
committerGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
commit0db1af31320aa010c8e97da80000f7548d889068 (patch)
treece2cd8dba8cf46b685dcb626b31e25c17702c1a0 /cpp/src/qpid/client
parent747ac26509e78ac9aa9120be02cd446ac99d21cd (diff)
downloadqpid-python-0db1af31320aa010c8e97da80000f7548d889068.tar.gz
Added initial 'execution-layer' to try out methods form the 0-10 execution class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp21
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp53
-rw-r--r--cpp/src/qpid/client/ClientChannel.h12
3 files changed, 67 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index 60368268c0..a1aacdee4e 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -101,7 +101,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
consumers.erase(i);
}
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
}
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
@@ -119,9 +119,9 @@ void BasicMessageChannel::cancelAll(){
Consumer& c = i->second;
if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
}
- channel.send(new BasicCancelBody(channel.version, i->first, true));
+ channel.send(make_shared_ptr(new BasicCancelBody(channel.version, i->first, true)));
}
consumers.clear();
}
@@ -131,8 +131,7 @@ bool BasicMessageChannel::get(
{
// Prepare for incoming response
incoming.addDestination(BASIC_GET, destGet);
- channel.send(
- new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)));
bool got = destGet.wait(msg);
return got;
}
@@ -150,9 +149,7 @@ void BasicMessageChannel::publish(
*static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
header->setContentSize(msg.getData().size());
- channel.send(
- new BasicPublishBody(
- channel.version, 0, e, key, mandatory, immediate));
+ channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)));
channel.send(header);
string data = msg.getData();
u_int64_t data_length = data.length();
@@ -160,14 +157,14 @@ void BasicMessageChannel::publish(
//frame itself uses 8 bytes
u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
if(data_length < frag_size){
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- channel.send(new AMQContentBody(frag));
+ channel.send(make_shared_ptr(new AMQContentBody(frag)));
offset += length;
remaining = data_length - offset;
@@ -268,11 +265,11 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
//else drop-through
case AUTO_ACK:
consumer.lastDeliveryTag = 0;
- channel.send(
+ channel.send(make_shared_ptr(
new BasicAckBody(
channel.version,
msg.getDeliveryTag(),
- multiple));
+ multiple)));
case NO_ACK: // Nothing to do
case CLIENT_ACK: // User code must ack.
break;
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index ab6b9a41c3..816ff05e85 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -92,10 +92,10 @@ void Channel::protocolInit(
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- send(new ConnectionTuneOkBody(
+ sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
version, proposal->getRequestId(),
proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()));
+ proposal->getHeartbeat())));
uint16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -104,7 +104,7 @@ void Channel::protocolInit(
// Send connection open.
std::string capabilities;
responses.expect();
- send(new ConnectionOpenBody(version, vhost, capabilities, true));
+ sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
AMQMethodBody::shared_ptr openResponse = responses.receive();
@@ -210,6 +210,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
default: throw UnknownMethod();
}
}
@@ -223,7 +224,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
- send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
+ sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId())));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -241,6 +242,18 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
throw UnknownMethod();
}
+void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
+ Monitor::ScopedLock l(outgoingMonitor);
+ //record the completion mark:
+ outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
+ //TODO: notify anyone waiting for completion notification:
+ outgoingMonitor.notifyAll();
+ } else{
+ throw UnknownMethod();
+ }
+}
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
messaging->handle(body);
}
@@ -315,7 +328,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive(
AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
- send(toSend);
+ sendCommand(toSend);
return responses.receive(c, m);
}
@@ -325,7 +338,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- send(body);
+ sendCommand(body);
return AMQMethodBody::shared_ptr();
}
}
@@ -362,3 +375,31 @@ void Channel::run() {
messaging->run();
}
+void Channel::sendCommand(AMQBody::shared_ptr body)
+{
+ ++(outgoing.hwm);
+ send(body);
+}
+
+bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
+{
+ AbsTime end;
+ if (timeout == 0) {
+ end = AbsTime::FarFuture();
+ } else {
+ end = AbsTime(AbsTime::now(), timeout);
+ }
+
+ Monitor::ScopedLock l(outgoingMonitor);
+ while (end > AbsTime::now() && outgoing.lwm < poi) {
+ outgoingMonitor.wait(end);
+ }
+ return !(outgoing.lwm < poi);
+}
+
+bool Channel::synchWithServer(Duration timeout)
+{
+ send(make_shared_ptr(new ExecutionFlushBody(version)));
+ return waitForCompletion(outgoing.hwm, timeout);
+}
+
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index cea1245e6a..fc82fb41ff 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -29,6 +29,7 @@
#include "ResponseHandler.h"
#include "qpid/Exception.h"
#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -64,6 +65,8 @@ class Channel : public framing::ChannelAdapter
Connection* connection;
sys::Thread dispatcher;
ResponseHandler responses;
+ sys::Monitor outgoingMonitor;
+ framing::Window outgoing;
uint16_t prefetch;
const bool transactional;
@@ -84,6 +87,7 @@ class Channel : public framing::ChannelAdapter
framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
void handleConnection(framing::AMQMethodBody::shared_ptr method);
+ void handleExecution(framing::AMQMethodBody::shared_ptr method);
void setQos();
@@ -114,9 +118,12 @@ class Channel : public framing::ChannelAdapter
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
+ void sendCommand(framing::AMQBody::shared_ptr body);
+
void open(framing::ChannelId, Connection&);
void closeInternal();
void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+ bool waitForCompletion(framing::SequenceNumber, sys::Duration);
// FIXME aconway 2007-02-23: Get rid of friendships.
friend class Connection;
@@ -358,7 +365,10 @@ class Channel : public framing::ChannelAdapter
*/
void run();
-
+ /**
+ * TESTING ONLY FOR NOW!
+ */
+ bool synchWithServer(sys::Duration timeout = 0);
};
}}