From d4b433f542ab8a506d7dbc53e685770a96ee7958 Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Fri, 20 Feb 2009 00:04:37 +0000 Subject: Merged win-pollable-condition branch changes 743545:746056 into trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746061 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 2 +- cpp/src/common.vcproj | 1695 +++++++++++++++--------- cpp/src/qpid/sys/PollableCondition.h | 54 +- cpp/src/qpid/sys/PollableQueue.h | 46 +- cpp/src/qpid/sys/posix/PollableCondition.cpp | 119 +- cpp/src/qpid/sys/posix/PollableCondition.h | 56 - cpp/src/qpid/sys/windows/PollableCondition.cpp | 125 ++ cpp/src/tests/QueueEvents.cpp | 1 + 8 files changed, 1401 insertions(+), 697 deletions(-) delete mode 100644 cpp/src/qpid/sys/posix/PollableCondition.h create mode 100644 cpp/src/qpid/sys/windows/PollableCondition.cpp (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d5b53dc502..5d50756ec1 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -41,6 +41,7 @@ windows_dist = \ qpid/sys/windows/IOHandle.cpp \ qpid/sys/windows/IoHandlePrivate.h \ qpid/sys/windows/LockFile.cpp \ + qpid/sys/windows/PollableCondition.cpp \ qpid/sys/windows/Mutex.h \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/Socket.cpp \ @@ -139,7 +140,6 @@ posix_plat_hdr = \ qpid/sys/posix/PrivatePosix.h \ qpid/sys/posix/Mutex.h \ qpid/sys/posix/Fork.h \ - qpid/sys/posix/PollableCondition.h \ qpid/sys/posix/IntegerTypes.h \ qpid/sys/posix/Time.h diff --git a/cpp/src/common.vcproj b/cpp/src/common.vcproj index e8c40da426..c074afad53 100644 --- a/cpp/src/common.vcproj +++ b/cpp/src/common.vcproj @@ -1,25 +1,4 @@ - - @@ -169,7 +149,7 @@ /> @@ -244,7 +220,7 @@ /> + Filter="cpp;cxx;cc;C;c" + > + + + RelativePath="qpid\Address.cpp" + > + RelativePath="qpid\sys\AggregateOutput.cpp" + > + RelativePath="gen\qpid\framing\AllInvoker.cpp" + > + RelativePath="qpid\framing\AMQBody.cpp" + > + RelativePath="qpid\framing\AMQContentBody.cpp" + > + RelativePath="qpid\framing\AMQFrame.cpp" + > + RelativePath="qpid\framing\AMQHeaderBody.cpp" + > + RelativePath="qpid\framing\AMQHeartbeatBody.cpp" + > + RelativePath="qpid\framing\AMQMethodBody.cpp" + > + RelativePath="gen\qpid\framing\AMQP_AllProxy.cpp" + > + RelativePath="gen\qpid\framing\AMQP_ClientProxy.cpp" + > + RelativePath="qpid\framing\AMQP_HighestVersion.h" + > + RelativePath="gen\qpid\framing\AMQP_ServerProxy.cpp" + > + RelativePath="qpid\framing\Array.cpp" + > + RelativePath="qpid\assert.cpp" + > + RelativePath="qpid\assert.h" + > + RelativePath="qpid\sys\windows\AsynchIO.cpp" + > + RelativePath="qpid\sys\AsynchIOHandler.cpp" + > + RelativePath="qpid\framing\BodyHandler.cpp" + > + RelativePath="qpid\framing\Buffer.cpp" + > + RelativePath="gen\qpid\framing\ClientInvoker.cpp" + > + RelativePath="gen\qpid\framing\ClusterConfigChangeBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionAccumulatedAckBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionConsumerStateBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliverCloseBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliverDoOutputBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliveryRecordBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionExchangeBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionMembershipBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionQueueBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionQueuePositionBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionSessionStateBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionShadowReadyBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxAcceptBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxDequeueBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxEndBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxEnqueueBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxPublishBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterReadyBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterShutdownBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterUpdateOfferBody.cpp" + > + RelativePath="gen\qpid\framing\ClusterUpdateRequestBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionCloseBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionCloseOkBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionHeartbeatBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionOpenBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionOpenOkBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionRedirectBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionSecureBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionSecureOkBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionStartBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionStartOkBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionTuneBody.cpp" + > + RelativePath="gen\qpid\framing\ConnectionTuneOkBody.cpp" + > + RelativePath="qpid\DataDir.cpp" + > + RelativePath="gen\qpid\framing\DeliveryProperties.cpp" + > + RelativePath="qpid\sys\Dispatcher.cpp" + > + RelativePath="gen\qpid\framing\DtxCommitBody.cpp" + > + RelativePath="gen\qpid\framing\DtxEndBody.cpp" + > + RelativePath="gen\qpid\framing\DtxForgetBody.cpp" + > + RelativePath="gen\qpid\framing\DtxGetTimeoutBody.cpp" + > + RelativePath="gen\qpid\framing\DtxGetTimeoutResult.cpp" + > + RelativePath="gen\qpid\framing\DtxPrepareBody.cpp" + > + RelativePath="gen\qpid\framing\DtxRecoverBody.cpp" + > + RelativePath="gen\qpid\framing\DtxRecoverResult.cpp" + > + RelativePath="gen\qpid\framing\DtxRollbackBody.cpp" + > + RelativePath="gen\qpid\framing\DtxSelectBody.cpp" + > + RelativePath="gen\qpid\framing\DtxSetTimeoutBody.cpp" + > + RelativePath="gen\qpid\framing\DtxStartBody.cpp" + > + RelativePath="qpid\framing\Endian.cpp" + > + RelativePath="qpid\Exception.cpp" + > + RelativePath="gen\qpid\framing\ExchangeBindBody.cpp" + > + RelativePath="gen\qpid\framing\ExchangeBoundBody.cpp" + > + RelativePath="gen\qpid\framing\ExchangeBoundResult.cpp" + > + RelativePath="gen\qpid\framing\ExchangeDeclareBody.cpp" + > + RelativePath="gen\qpid\framing\ExchangeDeleteBody.cpp" + > + RelativePath="gen\qpid\framing\ExchangeQueryBody.cpp" + > + RelativePath="gen\qpid\framing\ExchangeQueryResult.cpp" + > + RelativePath="gen\qpid\framing\ExchangeUnbindBody.cpp" + > + RelativePath="gen\qpid\framing\ExecutionExceptionBody.cpp" + > + RelativePath="gen\qpid\framing\ExecutionResultBody.cpp" + > + RelativePath="gen\qpid\framing\ExecutionSyncBody.cpp" + > + RelativePath="qpid\framing\FieldTable.cpp" + > + RelativePath="qpid\framing\FieldValue.cpp" + > + RelativePath="gen\qpid\framing\FileAckBody.cpp" + > + RelativePath="gen\qpid\framing\FileCancelBody.cpp" + > + RelativePath="gen\qpid\framing\FileConsumeBody.cpp" + > + RelativePath="gen\qpid\framing\FileConsumeOkBody.cpp" + > + RelativePath="gen\qpid\framing\FileDeliverBody.cpp" + > + RelativePath="gen\qpid\framing\FileOpenBody.cpp" + > + RelativePath="gen\qpid\framing\FileOpenOkBody.cpp" + > + RelativePath="gen\qpid\framing\FileProperties.cpp" + > + RelativePath="gen\qpid\framing\FilePublishBody.cpp" + > + RelativePath="gen\qpid\framing\FileQosBody.cpp" + > + RelativePath="gen\qpid\framing\FileQosOkBody.cpp" + > + RelativePath="gen\qpid\framing\FileRejectBody.cpp" + > + RelativePath="gen\qpid\framing\FileReturnBody.cpp" + > + RelativePath="gen\qpid\framing\FileStageBody.cpp" + > + RelativePath="qpid\sys\windows\FileSysDir.cpp" + > + RelativePath="gen\qpid\framing\FragmentProperties.cpp" + > + RelativePath="qpid\framing\FrameDecoder.cpp" + > + RelativePath="qpid\framing\FrameSet.cpp" + > + RelativePath="gen\qpid\framing\Header.cpp" + > + RelativePath="qpid\sys\windows\IocpPoller.cpp" + > + RelativePath="qpid\sys\windows\IOHandle.cpp" + > + RelativePath="qpid\sys\LatencyMetric.cpp" + > + RelativePath="qpid\sys\windows\LockFile.cpp" + > + RelativePath="qpid\log\Logger.cpp" + > + RelativePath="qpid\management\Manageable.cpp" + > + RelativePath="qpid\management\ManagementObject.cpp" + > + RelativePath="gen\qpid\framing\MessageAcceptBody.cpp" + > + RelativePath="gen\qpid\framing\MessageAcquireBody.cpp" + > + RelativePath="gen\qpid\framing\MessageAcquireResult.cpp" + > + RelativePath="gen\qpid\framing\MessageCancelBody.cpp" + > + RelativePath="gen\qpid\framing\MessageFlowBody.cpp" + > + RelativePath="gen\qpid\framing\MessageFlushBody.cpp" + > + RelativePath="gen\qpid\framing\MessageProperties.cpp" + > + RelativePath="gen\qpid\framing\MessageRejectBody.cpp" + > + RelativePath="gen\qpid\framing\MessageReleaseBody.cpp" + > + RelativePath="gen\qpid\framing\MessageResumeBody.cpp" + > + RelativePath="gen\qpid\framing\MessageResumeResult.cpp" + > + RelativePath="gen\qpid\framing\MessageSetFlowModeBody.cpp" + > + RelativePath="gen\qpid\framing\MessageStopBody.cpp" + > + RelativePath="gen\qpid\framing\MessageSubscribeBody.cpp" + > + RelativePath="gen\qpid\framing\MessageTransferBody.cpp" + > + RelativePath="gen\qpid\framing\MethodBodyDefaultVisitor.cpp" + > + RelativePath="gen\qpid\framing\MethodBodyFactory.cpp" + > + RelativePath="qpid\Modules.cpp" + > + RelativePath="qpid\Options.cpp" + > + + + + + + + + + + + + + RelativePath="qpid\log\Options.cpp" + > + RelativePath="qpid\log\OstreamOutput.cpp" + > + RelativePath="qpid\Plugin.cpp" + > + RelativePath="qpid\pointer_to_other.h" + > + RelativePath="qpid\sys\windows\PollableCondition.cpp" + > + RelativePath="qpid\framing\ProtocolInitiation.cpp" + > + RelativePath="qpid\framing\ProtocolVersion.cpp" + > + RelativePath="qpid\framing\Proxy.cpp" + > + RelativePath="gen\qpid\framing\QueueDeclareBody.cpp" + > + RelativePath="gen\qpid\framing\QueueDeleteBody.cpp" + > + RelativePath="gen\qpid\framing\QueuePurgeBody.cpp" + > + RelativePath="gen\qpid\framing\QueueQueryBody.cpp" + > + RelativePath="gen\qpid\framing\QueueQueryResult.cpp" + > + RelativePath="qpid\RefCountedBuffer.cpp" + > + RelativePath="qpid\RefCountedBuffer.h" + > + RelativePath="gen\qpid\framing\reply_exceptions.cpp" + > + RelativePath="gen\qpid\framing\ReplyTo.cpp" + > + RelativePath="qpid\sys\Runnable.cpp" + > + RelativePath="qpid\log\Selector.cpp" + > + RelativePath="qpid\framing\SendContent.cpp" + > + RelativePath="qpid\framing\SequenceNumber.cpp" + > + RelativePath="qpid\framing\SequenceNumberSet.cpp" + > + RelativePath="qpid\framing\SequenceSet.cpp" + > + RelativePath="qpid\Serializer.h" + > + RelativePath="gen\qpid\framing\ServerInvoker.cpp" + > + RelativePath="gen\qpid\framing\SessionAttachBody.cpp" + > + RelativePath="gen\qpid\framing\SessionAttachedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionCommandPointBody.cpp" + > + RelativePath="gen\qpid\framing\SessionCompletedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionConfirmedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionDetachBody.cpp" + > + RelativePath="gen\qpid\framing\SessionDetachedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionExpectedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionFlushBody.cpp" + > + RelativePath="gen\qpid\framing\SessionGapBody.cpp" + > + RelativePath="qpid\amqp_0_10\SessionHandler.cpp" + > + RelativePath="qpid\amqp_0_10\SessionHandler.h" + > + RelativePath="qpid\SessionId.cpp" + > + RelativePath="gen\qpid\framing\SessionKnownCompletedBody.cpp" + > + RelativePath="gen\qpid\framing\SessionRequestTimeoutBody.cpp" + > + RelativePath="qpid\SessionState.cpp" + > + RelativePath="qpid\SessionState.h" + > + RelativePath="gen\qpid\framing\SessionTimeoutBody.cpp" + > - - - - - - - - - - - - + RelativePath="qpid\sys\windows\Shlib.cpp" + > + + + + + + + + + + + + + RelativePath="qpid\sys\Shlib.cpp" + > + RelativePath="qpid\log\windows\SinkOptions.cpp" + > + RelativePath="qpid\sys\windows\Socket.cpp" + > + RelativePath="qpid\log\Statement.cpp" + > + RelativePath="gen\qpid\framing\StreamCancelBody.cpp" + > + RelativePath="gen\qpid\framing\StreamConsumeBody.cpp" + > + RelativePath="gen\qpid\framing\StreamConsumeOkBody.cpp" + > + RelativePath="gen\qpid\framing\StreamDeliverBody.cpp" + > + RelativePath="gen\qpid\framing\StreamProperties.cpp" + > + RelativePath="gen\qpid\framing\StreamPublishBody.cpp" + > + RelativePath="gen\qpid\framing\StreamQosBody.cpp" + > + RelativePath="gen\qpid\framing\StreamQosOkBody.cpp" + > + RelativePath="gen\qpid\framing\StreamReturnBody.cpp" + > + RelativePath="qpid\sys\windows\StrError.cpp" + > + RelativePath="qpid\StringUtils.cpp" + > + RelativePath="qpid\sys\windows\SystemInfo.cpp" + > + RelativePath="qpid\sys\windows\Thread.cpp" + > + RelativePath="qpid\sys\windows\Time.cpp" + > + RelativePath="qpid\sys\Timer.cpp" + > + RelativePath="qpid\framing\TransferContent.cpp" + > + RelativePath="gen\qpid\framing\TxCommitBody.cpp" + > - - - - - - - - - - - - + RelativePath="gen\qpid\framing\TxRollbackBody.cpp" + > + RelativePath="gen\qpid\framing\TxSelectBody.cpp" + > + RelativePath="gen\qpid\framing\TypeCode.cpp" + > + RelativePath="qpid\Url.cpp" + > + RelativePath="qpid\sys\windows\uuid.cpp" + > + + + + + + + + + + + + + RelativePath="qpid\framing\Uuid.cpp" + > - - - - - - - - - - - - + RelativePath="gen\qpid\framing\XaResult.cpp" + > + RelativePath="gen\qpid\framing\Xid.cpp" + > + Filter="h;hpp;hxx;hh" + > + RelativePath="qpid\framing\AccumulatedAck.h" + > + RelativePath="qpid\Address.h" + > + RelativePath="qpid\sys\AggregateOutput.h" + > + RelativePath="gen\qpid\framing\all_method_bodies.h" + > + RelativePath="gen\qpid\framing\AllInvoker.h" + > + RelativePath="qpid\framing\AMQBody.h" + > + RelativePath="qpid\framing\AMQCommandControlBody.h" + > + RelativePath="qpid\framing\AMQContentBody.h" + > + RelativePath="qpid\framing\AMQDataBlock.h" + > + RelativePath="qpid\framing\AMQFrame.h" + > + RelativePath="qpid\framing\AMQHeaderBody.h" + > + RelativePath="qpid\framing\AMQHeartbeatBody.h" + > + RelativePath="qpid\framing\AMQMethodBody.h" + > + RelativePath="gen\qpid\framing\AMQP_AllOperations.h" + > + RelativePath="gen\qpid\framing\AMQP_AllProxy.h" + > + RelativePath="gen\qpid\framing\AMQP_ClientOperations.h" + > + RelativePath="gen\qpid\framing\AMQP_ClientProxy.h" + > + RelativePath="qpid\framing\amqp_framing.h" + > + RelativePath="qpid\framing\AMQP_HighestVersion.h" + > + RelativePath="gen\qpid\framing\AMQP_ServerOperations.h" + > + RelativePath="gen\qpid\framing\AMQP_ServerProxy.h" + > + RelativePath="gen\qpid\framing\amqp_structs.h" + > + RelativePath="qpid\framing\amqp_types.h" + > + RelativePath="qpid\framing\amqp_types_full.h" + > + RelativePath="qpid\amqp_0_10\apply.h" + > + RelativePath="qpid\framing\Array.h" + > + RelativePath="qpid\assert.h" + > + RelativePath="qpid\sys\AsynchIO.h" + > + RelativePath="qpid\sys\AsynchIOHandler.h" + > + RelativePath="qpid\sys\windows\AsynchIoResult.h" + > + RelativePath="qpid\sys\AtomicCount.h" + > + RelativePath="qpid\sys\AtomicValue.h" + > + RelativePath="qpid\sys\BlockingQueue.h" + > + RelativePath="qpid\framing\BodyFactory.h" + > + RelativePath="qpid\framing\BodyHandler.h" + > + RelativePath="qpid\framing\Buffer.h" + > + RelativePath="qpid\framing\ChannelHandler.h" + > + RelativePath="qpid\sys\windows\check.h" + > + RelativePath="gen\qpid\framing\ClientInvoker.h" + > + RelativePath="gen\qpid\framing\ClusterConfigChangeBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionAccumulatedAckBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionConsumerStateBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliverCloseBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliverDoOutputBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionDeliveryRecordBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionExchangeBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionMembershipBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionQueueBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionQueuePositionBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionSessionStateBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionShadowReadyBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxAcceptBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxDequeueBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxEndBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxEnqueueBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxPublishBody.h" + > + RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.h" + > + RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.h" + > + RelativePath="gen\qpid\framing\ClusterReadyBody.h" + > + RelativePath="gen\qpid\framing\ClusterShutdownBody.h" + > + RelativePath="gen\qpid\framing\ClusterUpdateOfferBody.h" + > + RelativePath="gen\qpid\framing\ClusterUpdateRequestBody.h" + > + RelativePath="qpid\sys\Codec.h" + > + RelativePath="qpid\sys\windows\Condition.h" + > + RelativePath="qpid\sys\Condition.h" + > + RelativePath="gen\qpid\framing\ConnectionCloseBody.h" + > + RelativePath="gen\qpid\framing\ConnectionCloseOkBody.h" + > + RelativePath="qpid\sys\ConnectionCodec.h" + > + RelativePath="gen\qpid\framing\ConnectionHeartbeatBody.h" + > + RelativePath="qpid\sys\ConnectionInputHandler.h" + > + RelativePath="qpid\sys\ConnectionInputHandlerFactory.h" + > + RelativePath="gen\qpid\framing\ConnectionOpenBody.h" + > + RelativePath="gen\qpid\framing\ConnectionOpenOkBody.h" + > + RelativePath="qpid\sys\ConnectionOutputHandler.h" + > + RelativePath="qpid\sys\ConnectionOutputHandlerPtr.h" + > + RelativePath="gen\qpid\framing\ConnectionRedirectBody.h" + > + RelativePath="gen\qpid\framing\ConnectionSecureBody.h" + > + RelativePath="gen\qpid\framing\ConnectionSecureOkBody.h" + > + RelativePath="gen\qpid\framing\ConnectionStartBody.h" + > + RelativePath="gen\qpid\framing\ConnectionStartOkBody.h" + > + RelativePath="gen\qpid\framing\ConnectionTuneBody.h" + > + RelativePath="gen\qpid\framing\ConnectionTuneOkBody.h" + > + RelativePath="gen\qpid\framing\constants.h" + > + RelativePath="qpid\sys\CopyOnWriteArray.h" + > + RelativePath="qpid\DataDir.h" + > + RelativePath="qpid\sys\DeletionManager.h" + > + RelativePath="gen\qpid\framing\DeliveryProperties.h" + > + RelativePath="qpid\sys\Dispatcher.h" + > + RelativePath="gen\qpid\framing\DtxCommitBody.h" + > + RelativePath="gen\qpid\framing\DtxEndBody.h" + > + RelativePath="gen\qpid\framing\DtxForgetBody.h" + > + RelativePath="gen\qpid\framing\DtxGetTimeoutBody.h" + > + RelativePath="gen\qpid\framing\DtxGetTimeoutResult.h" + > + RelativePath="gen\qpid\framing\DtxPrepareBody.h" + > + RelativePath="gen\qpid\framing\DtxRecoverBody.h" + > + RelativePath="gen\qpid\framing\DtxRecoverResult.h" + > + RelativePath="gen\qpid\framing\DtxRollbackBody.h" + > + RelativePath="gen\qpid\framing\DtxSelectBody.h" + > + RelativePath="gen\qpid\framing\DtxSetTimeoutBody.h" + > + RelativePath="gen\qpid\framing\DtxStartBody.h" + > + RelativePath="qpid\framing\Endian.h" + > + RelativePath="gen\qpid\framing\enum.h" + > + RelativePath="qpid\amqp_0_10\Exception.h" + > + RelativePath="qpid\Exception.h" + > + RelativePath="qpid\sys\ExceptionHolder.h" + > + RelativePath="gen\qpid\framing\ExchangeBindBody.h" + > + RelativePath="gen\qpid\framing\ExchangeBoundBody.h" + > + RelativePath="gen\qpid\framing\ExchangeBoundResult.h" + > + RelativePath="gen\qpid\framing\ExchangeDeclareBody.h" + > + RelativePath="gen\qpid\framing\ExchangeDeleteBody.h" + > + RelativePath="gen\qpid\framing\ExchangeQueryBody.h" + > + RelativePath="gen\qpid\framing\ExchangeQueryResult.h" + > + RelativePath="gen\qpid\framing\ExchangeUnbindBody.h" + > + RelativePath="gen\qpid\framing\ExecutionExceptionBody.h" + > + RelativePath="gen\qpid\framing\ExecutionResultBody.h" + > + RelativePath="gen\qpid\framing\ExecutionSyncBody.h" + > + RelativePath="qpid\framing\FieldTable.h" + > + RelativePath="qpid\framing\FieldValue.h" + > + RelativePath="gen\qpid\framing\FileAckBody.h" + > + RelativePath="gen\qpid\framing\FileCancelBody.h" + > + RelativePath="gen\qpid\framing\FileConsumeBody.h" + > + RelativePath="gen\qpid\framing\FileConsumeOkBody.h" + > + RelativePath="gen\qpid\framing\FileDeliverBody.h" + > + RelativePath="gen\qpid\framing\FileOpenBody.h" + > + RelativePath="gen\qpid\framing\FileOpenOkBody.h" + > + RelativePath="gen\qpid\framing\FileProperties.h" + > + RelativePath="gen\qpid\framing\FilePublishBody.h" + > + RelativePath="gen\qpid\framing\FileQosBody.h" + > + RelativePath="gen\qpid\framing\FileQosOkBody.h" + > + RelativePath="gen\qpid\framing\FileRejectBody.h" + > + RelativePath="gen\qpid\framing\FileReturnBody.h" + > + RelativePath="gen\qpid\framing\FileStageBody.h" + > + RelativePath="qpid\sys\FileSysDir.h" + > + RelativePath="gen\qpid\framing\FragmentProperties.h" + > + RelativePath="gen\qpid\framing\frame_body_lists.h" + > + RelativePath="qpid\framing\frame_functors.h" + > + RelativePath="qpid\framing\FrameDecoder.h" + > + RelativePath="qpid\framing\FrameDefaultVisitor.h" + > + RelativePath="qpid\framing\FrameHandler.h" + > + RelativePath="qpid\framing\FrameSet.h" + > + RelativePath="qpid\framing\Handler.h" + > + RelativePath="gen\qpid\framing\Header.h" + > + RelativePath="qpid\framing\HeaderProperties.h" + > + RelativePath="qpid\framing\variant.h qpid\log\Helpers.h" + > + RelativePath="qpid\framing\InitiationHandler.h" + > + RelativePath="qpid\InlineAllocator.h" + > + RelativePath="qpid\InlineVector.h" + > + RelativePath="qpid\framing\InputHandler.h" + > + RelativePath="qpid\sys\windows\IntegerTypes.h" + > + RelativePath="qpid\sys\IntegerTypes.h" + > + RelativePath="qpid\framing\Invoker.h" + > + RelativePath="qpid\sys\IOHandle.h" + > + RelativePath="qpid\sys\windows\IoHandlePrivate.h" + > + RelativePath="qpid\sys\LatencyMetric.h" + > + RelativePath="qpid\sys\LockFile.h" + > + RelativePath="qpid\sys\LockPtr.h" + > + RelativePath="qpid\log\Logger.h" + > + RelativePath="qpid\management\Manageable.h" + > + RelativePath="qpid\management\ManagementObject.h" + > + RelativePath="qpid\memory.h" + > + RelativePath="gen\qpid\framing\MessageAcceptBody.h" + > + RelativePath="gen\qpid\framing\MessageAcquireBody.h" + > + RelativePath="gen\qpid\framing\MessageAcquireResult.h" + > + RelativePath="gen\qpid\framing\MessageCancelBody.h" + > + RelativePath="gen\qpid\framing\MessageFlowBody.h" + > + RelativePath="gen\qpid\framing\MessageFlushBody.h" + > + RelativePath="gen\qpid\framing\MessageProperties.h" + > + RelativePath="gen\qpid\framing\MessageRejectBody.h" + > + RelativePath="gen\qpid\framing\MessageReleaseBody.h" + > + RelativePath="gen\qpid\framing\MessageResumeBody.h" + > + RelativePath="gen\qpid\framing\MessageResumeResult.h" + > + RelativePath="gen\qpid\framing\MessageSetFlowModeBody.h" + > + RelativePath="gen\qpid\framing\MessageStopBody.h" + > + RelativePath="gen\qpid\framing\MessageSubscribeBody.h" + > + RelativePath="gen\qpid\framing\MessageTransferBody.h" + > + RelativePath="gen\qpid\framing\MethodBodyConstVisitor.h" + > + RelativePath="gen\qpid\framing\MethodBodyDefaultVisitor.h" + > + RelativePath="qpid\framing\MethodBodyFactory.h" + > + RelativePath="qpid\framing\MethodContent.h" + > + RelativePath="qpid\framing\ModelMethod.h" + > + RelativePath="qpid\Modules.h" + > + RelativePath="qpid\sys\Monitor.h" + > + RelativePath="qpid\Msg.h" + > + RelativePath="qpid\sys\windows\Mutex.h" + > + RelativePath="qpid\sys\Mutex.h" + > + RelativePath="qpid\Options.h" + > + RelativePath="qpid\log\Options.h" + > + RelativePath="qpid\log\OstreamOutput.h" + > + RelativePath="qpid\sys\OutputControl.h" + > + RelativePath="qpid\framing\OutputHandler.h" + > + RelativePath="qpid\sys\OutputTask.h" + > + RelativePath="qpid\Plugin.h" + > + RelativePath="qpid\pointer_to_other.h" + > + RelativePath="qpid\sys\PollableCondition.h" + > + RelativePath="qpid\sys\PollableQueue.h" + > + RelativePath="qpid\sys\Poller.h" + > + RelativePath="qpid\sys\ProtocolFactory.h" + > + RelativePath="qpid\framing\ProtocolInitiation.h" + > + RelativePath="qpid\framing\ProtocolVersion.h" + > + RelativePath="qpid\framing\Proxy.h" + > + RelativePath="qpid\ptr_map.h" + > + RelativePath="gen\qpid\framing\QueueDeclareBody.h" + > + RelativePath="gen\qpid\framing\QueueDeleteBody.h" + > + RelativePath="gen\qpid\framing\QueuePurgeBody.h" + > + RelativePath="gen\qpid\framing\QueueQueryBody.h" + > + RelativePath="gen\qpid\framing\QueueQueryResult.h" + > + RelativePath="qpid\RangeSet.h" + > + RelativePath="qpid\RefCounted.h" + > + RelativePath="qpid\RefCountedBuffer.h" + > + RelativePath="gen\qpid\framing\reply_exceptions.h" + > + RelativePath="gen\qpid\framing\ReplyTo.h" + > + RelativePath="qpid\sys\Runnable.h" + > + RelativePath="qpid\sys\ScopedIncrement.h" + > + RelativePath="qpid\sys\SecurityLayer.h" + > + RelativePath="qpid\log\Selector.h" + > + RelativePath="qpid\sys\Semaphore.h" + > + RelativePath="qpid\framing\SendContent.h" + > + RelativePath="qpid\framing\SequenceNumber.h" + > + RelativePath="qpid\framing\SequenceNumberSet.h" + > + RelativePath="qpid\framing\SequenceSet.h" + > + RelativePath="qpid\Serializer.h" + > + RelativePath="gen\qpid\framing\ServerInvoker.h" + > + RelativePath="gen\qpid\framing\SessionAttachBody.h" + > + RelativePath="gen\qpid\framing\SessionAttachedBody.h" + > + RelativePath="gen\qpid\framing\SessionCommandPointBody.h" + > + RelativePath="gen\qpid\framing\SessionCompletedBody.h" + > + RelativePath="gen\qpid\framing\SessionConfirmedBody.h" + > + RelativePath="gen\qpid\framing\SessionDetachBody.h" + > + RelativePath="gen\qpid\framing\SessionDetachedBody.h" + > + RelativePath="gen\qpid\framing\SessionExpectedBody.h" + > + RelativePath="gen\qpid\framing\SessionFlushBody.h" + > + RelativePath="gen\qpid\framing\SessionGapBody.h" + > + RelativePath="qpid\amqp_0_10\SessionHandler.h" + > + RelativePath="qpid\SessionId.h" + > + RelativePath="gen\qpid\framing\SessionKnownCompletedBody.h" + > + RelativePath="gen\qpid\framing\SessionRequestTimeoutBody.h" + > + RelativePath="qpid\SessionState.h" + > + RelativePath="gen\qpid\framing\SessionTimeoutBody.h" + > + RelativePath="qpid\shared_ptr.h" + > + RelativePath="qpid\SharedObject.h" + > + RelativePath="qpid\sys\Shlib.h" + > + RelativePath="qpid\sys\ShutdownHandler.h" + > + RelativePath="qpid\log\windows\SinkOptions.h" + > + RelativePath="qpid\log\SinkOptions.h" + > + RelativePath="qpid\sys\Socket.h" + > + RelativePath="qpid\log\Statement.h" + > + RelativePath="qpid\sys\StateMonitor.h" + > + RelativePath="gen\qpid\framing\StreamCancelBody.h" + > + RelativePath="gen\qpid\framing\StreamConsumeBody.h" + > + RelativePath="gen\qpid\framing\StreamConsumeOkBody.h" + > + RelativePath="gen\qpid\framing\StreamDeliverBody.h" + > + RelativePath="gen\qpid\framing\StreamProperties.h" + > + RelativePath="gen\qpid\framing\StreamPublishBody.h" + > + RelativePath="gen\qpid\framing\StreamQosBody.h" + > + RelativePath="gen\qpid\framing\StreamQosOkBody.h" + > + RelativePath="gen\qpid\framing\StreamReturnBody.h" + > + RelativePath="qpid\sys\StrError.h" + > + RelativePath="qpid\StringUtils.h" + > + RelativePath="qpid\framing\StructHelper.h" + > + RelativePath="qpid\sys\SystemInfo.h" + > + RelativePath="qpid\sys\Thread.h" + > + RelativePath="qpid\sys\windows\Time.h" + > + RelativePath="qpid\sys\Time.h" + > + RelativePath="qpid\sys\TimeoutHandler.h" + > + RelativePath="qpid\sys\Timer.h" + > + RelativePath="qpid\framing\TransferContent.h" + > + RelativePath="gen\qpid\framing\TxCommitBody.h" + > + RelativePath="gen\qpid\framing\TxRollbackBody.h" + > + RelativePath="gen\qpid\framing\TxSelectBody.h" + > + RelativePath="gen\qpid\framing\TypeCode.h" + > + RelativePath="qpid\framing\TypeFilter.h" + > + RelativePath="qpid\Url.h" + > + RelativePath="qpid\framing\Uuid.h" + > + RelativePath="qpid\sys\windows\uuid.h" + > + RelativePath="qpid\sys\uuid.h" + > + RelativePath="qpid\Version.h" + > + RelativePath="qpid\framing\Visitor.h" + > + RelativePath="qpid\sys\Waitable.h" + > + RelativePath="gen\qpid\framing\XaResult.h" + > + RelativePath="gen\qpid\framing\Xid.h" + > diff --git a/cpp/src/qpid/sys/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h index 56d38f90da..49e84e6cb0 100644 --- a/cpp/src/qpid/sys/PollableCondition.h +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -22,7 +22,57 @@ * */ -// Currently only has a posix implementation, add #ifdefs for other platforms as needed. -#include "posix/PollableCondition.h" +#include "qpid/sys/Poller.h" +#include +#include + + +namespace qpid { +namespace sys { + +class PollableConditionPrivate; + +class PollableCondition { +public: + typedef boost::function1 Callback; + + PollableCondition(const Callback& cb, + const boost::shared_ptr& poller); + + ~PollableCondition(); + + /** + * Set the condition. Triggers callback to Callback from Poller. + * When callback is made, condition is suspended. Call rearm() to + * resume reacting to the condition. + */ + void set(); + + /** + * Get the current state of the condition, then clear it. + * + * @return The state of the condition before it was cleared. + */ + bool clear(); + + /** + * Temporarily suspend the ability for the poller to react to the + * condition. It can be rearm()ed later. + */ + void disarm(); + + /** + * Reset the ability for the poller to react to the condition. + */ + void rearm(); + + private: + PollableConditionPrivate *impl; + + Callback callback; + boost::shared_ptr poller; +}; + +}} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index b5ff98c2c7..a23cc5137a 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -23,8 +23,6 @@ */ #include "qpid/sys/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include @@ -38,9 +36,10 @@ namespace sys { class Poller; /** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. + * A queue whose item processing is dispatched by sys::Poller. + * Any thread can push to the queue; items pushed trigger an event the Poller + * recognizes. When a Poller I/O thread dispatches the event, a + * user-specified callback is invoked with all items on the queue. */ template class PollableQueue { @@ -50,12 +49,21 @@ class PollableQueue { /** * Callback to process a batch of items from the queue. - * @param values to process, any items remaining after call are put back on the queue. + * + * @param values Queue of values to process. Any items remaining + * on return from Callback are put back on the queue. */ typedef boost::function Callback; - /** When the queue is selected by the poller, values are passed to callback cb. */ - PollableQueue(const Callback& cb, const boost::shared_ptr& poller); + /** + * Constructor; sets necessary parameters. + * + * @param cb Callback that will be called to process items on the + * queue. Will be called from a Poller I/O thread. + * @param poller Poller to use for dispatching queue events. + */ + PollableQueue(const Callback& cb, + const boost::shared_ptr& poller); ~PollableQueue(); @@ -85,14 +93,12 @@ class PollableQueue { typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; - void dispatch(sys::DispatchHandle&); + void dispatch(PollableCondition& cond); void process(); mutable sys::Monitor lock; Callback callback; - boost::shared_ptr poller; PollableCondition condition; - DispatchHandleRef handle; Queue queue, batch; Thread dispatcher; bool stopped; @@ -100,11 +106,10 @@ class PollableQueue { template PollableQueue::PollableQueue( const Callback& cb, const boost::shared_ptr& p) - : callback(cb), poller(p), - handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0), stopped(true) + : callback(cb), + condition(boost::bind(&PollableQueue::dispatch, this, _1), p), + stopped(true) { - handle.startWatch(poller); - handle.unwatch(); } template void PollableQueue::start() { @@ -112,11 +117,10 @@ template void PollableQueue::start() { if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); - handle.rewatch(); + condition.rearm(); } template PollableQueue::~PollableQueue() { - handle.stopWatch(); } template void PollableQueue::push(const T& t) { @@ -125,15 +129,15 @@ template void PollableQueue::push(const T& t) { queue.push_back(t); } -template void PollableQueue::dispatch(sys::DispatchHandle& h) { +template void PollableQueue::dispatch(PollableCondition& cond) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); process(); dispatcher = Thread(); - if (queue.empty()) condition.clear(); + if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + else cond.rearm(); } template void PollableQueue::process() { @@ -159,7 +163,7 @@ template void PollableQueue::shutdown() { template void PollableQueue::stop() { ScopedLock l(lock); if (stopped) return; - handle.unwatch(); + condition.disarm(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index 0c55fd3c0d..0991e5fd76 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -22,17 +22,46 @@ * */ -#include "PollableCondition.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" +#include + #include #include namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +class PollableConditionPrivate : public sys::IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr& poller); + ~PollableConditionPrivate(); + + void dispatch(sys::DispatchHandle& h); + void rewatch(); + void unwatch(); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr poller; + int writeFd; + std::auto_ptr handle; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr& poller) + : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +{ int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); @@ -42,22 +71,71 @@ PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + handle.reset (new DispatchHandleRef(*this, + boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), + 0, 0)); + handle->startWatch(poller); + handle->unwatch(); +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ + handle->stopWatch(); + close(writeFd); +} + +void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/) +{ + cb(parent); +} + +void PollableConditionPrivate::rewatch() +{ + handle->rewatch(); +} + +void PollableConditionPrivate::unwatch() +{ + handle->unwatch(); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + static const char dummy=0; + ssize_t n = ::write(impl->writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) + throw ErrnoException("Error setting PollableCondition"); } bool PollableCondition::clear() { char buf[256]; ssize_t n; bool wasSet = false; - while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) + while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) wasSet = true; - if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); + if (n == -1 && errno != EAGAIN) + throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); return wasSet; } -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); +void PollableCondition::disarm() { + impl->unwatch(); +} + +void PollableCondition::rearm() { + impl->rewatch(); } @@ -71,22 +149,35 @@ void PollableCondition::set() { namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr& poller) + : cb(cb), parent(parent), poller(poller), + IOHandle(new sys::IOHandlePrivate) { impl->fd = ::eventfd(0, 0); if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); } +void PollableCondition::set() { + static const uint64_t value=1; + ssize_t n = ::write(impl->impl->fd, + reinterpret_cast(&value), 8); + if (n != 8) throw ErrnoException("write failed on conditionfd"); +} + bool PollableCondition::clear() { char buf[8]; - ssize_t n = ::read(impl->fd, buf, 8); + ssize_t n = ::read(impl->impl->fd, buf, 8); if (n != 8) throw ErrnoException("read failed on conditionfd"); return *reinterpret_cast(buf); } -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->fd, reinterpret_cast(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); +void PollableCondition::disarm() { + // ???? +} + +void PollableCondition::rearm() { + // ???? } #endif diff --git a/cpp/src/qpid/sys/posix/PollableCondition.h b/cpp/src/qpid/sys/posix/PollableCondition.h deleted file mode 100644 index 4ec277b0ec..0000000000 --- a/cpp/src/qpid/sys/posix/PollableCondition.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_SYS_POSIX_POLLABLECONDITION_H -#define QPID_SYS_POSIX_POLLABLECONDITION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" - -namespace qpid { -namespace sys { - -/** - * A pollable condition to integrate in-process conditions with IO - * conditions in a polling loop. - * - * Setting the condition makes it readable for a poller. - * - * Writable/disconnected conditions are undefined and should not be - * polled for. - */ -class PollableCondition : public sys::IOHandle { - public: - PollableCondition(); - - /** Set the condition, triggers readable in a poller. */ - void set(); - - /** Get the current state of the condition, then clear it. - *@return The state of the condition before it was cleared. - */ - bool clear(); - - private: - int writeFd; -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_POSIX_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp new file mode 100644 index 0000000000..ed0f7c3917 --- /dev/null +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -0,0 +1,125 @@ +#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP +#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/IOHandle.h" +#include "AsynchIoResult.h" +#include "IoHandlePrivate.h" + +#include +#include + +namespace qpid { +namespace sys { + +// PollableConditionPrivate will reuse the IocpPoller's ability to queue +// a completion to the IOCP and have it dispatched to the completer callback +// noted in the IOHandlePrivate when the request is queued. The +// AsynchCallbackRequest object is not really used - we already have the +// desired callback for the user of PollableCondition. +class PollableConditionPrivate : private IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr& poller); + ~PollableConditionPrivate(); + + void poke(); + void dispatch(AsynchIoResult *result); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr poller; + LONG isSet; + LONG armed; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr& poller) + : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, + boost::bind(&PollableConditionPrivate::dispatch, this, _1))), + cb(cb), parent(parent), poller(poller), isSet(0), armed(0) +{ +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ +} + +void PollableConditionPrivate::poke() +{ + if (!armed) + return; + + // addFd will queue a completion for the IOCP; when it's handled, a + // poller thread will call back to dispatch() below. + PollerHandle ph(*this); + poller->addFd(ph, Poller::INPUT); +} + +void PollableConditionPrivate::dispatch(AsynchIoResult *result) +{ + delete result; // Poller::addFd() allocates this + cb(parent); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + // Add one to the set count and poke it to provoke a callback + ::InterlockedIncrement(&impl->isSet); + impl->poke(); +} + +bool PollableCondition::clear() { + return (0 != ::InterlockedExchange(&impl->isSet, 0)); +} + +void PollableCondition::disarm() { + ::InterlockedExchange(&impl->armed, 0); +} + +void PollableCondition::rearm() { + if (0 == ::InterlockedExchange(&impl->armed, 1) && impl->isSet) + impl->poke(); +} + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp index 3e377d04b3..df3c937e33 100644 --- a/cpp/src/tests/QueueEvents.cpp +++ b/cpp/src/tests/QueueEvents.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/QueueEvents.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Dispatcher.h" #include #include -- cgit v1.2.1