diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-23 17:44:01 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-23 17:44:01 +0000 |
| commit | 29cdb59f55490ee190a393b9fc03dd29fe08f142 (patch) | |
| tree | dd9650196224f9b1c1f63cfee134e0a68b4dcd5f /src | |
| parent | 108376a9fffbe57e12b39ee71ba25f7f04134b12 (diff) | |
| download | rabbitmq-server-git-29cdb59f55490ee190a393b9fc03dd29fe08f142.tar.gz | |
Fix expiry
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 |
2 files changed, 6 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a9f5e5ac34..1664260423 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -154,11 +154,11 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ, SPid1 =/= dead], [erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs], {Total, _BQS} = - BQ:fold(fun (M = #basic_message{}, I) -> + BQ:fold(fun ({Msg, MsgProps}, I) -> wait_for_credit(), [begin credit_flow:send(SPid, ?CREDIT_DISC_BOUND), - SPid ! {sync_message, Ref, M} + SPid ! {sync_message, Ref, Msg, MsgProps} end || SPid <- SPids1], case I rem 1000 of 0 -> rabbit_log:info( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a8615cee4b..d408c56eda 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -858,12 +858,10 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, {sync_complete, Ref} -> erlang:demonitor(MRef), set_delta(0, State); - {sync_message, Ref, M} -> + {sync_message, Ref, Msg, Props0} -> credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), - %% TODO expiry needs fixing - Props = #message_properties{expiry = undefined, - needs_confirming = false, - delivered = true}, - BQS1 = BQ:publish(M, Props, none, BQS), + Props = Props0#message_properties{needs_confirming = false, + delivered = true}, + BQS1 = BQ:publish(Msg, Props, none, BQS), sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}) end. |
