summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-23 17:44:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-23 17:44:01 +0000
commit29cdb59f55490ee190a393b9fc03dd29fe08f142 (patch)
treedd9650196224f9b1c1f63cfee134e0a68b4dcd5f /src
parent108376a9fffbe57e12b39ee71ba25f7f04134b12 (diff)
downloadrabbitmq-server-git-29cdb59f55490ee190a393b9fc03dd29fe08f142.tar.gz
Fix expiry
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
2 files changed, 6 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a9f5e5ac34..1664260423 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -154,11 +154,11 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ,
SPid1 =/= dead],
[erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs],
{Total, _BQS} =
- BQ:fold(fun (M = #basic_message{}, I) ->
+ BQ:fold(fun ({Msg, MsgProps}, I) ->
wait_for_credit(),
[begin
credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
- SPid ! {sync_message, Ref, M}
+ SPid ! {sync_message, Ref, Msg, MsgProps}
end || SPid <- SPids1],
case I rem 1000 of
0 -> rabbit_log:info(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a8615cee4b..d408c56eda 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -858,12 +858,10 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
{sync_complete, Ref} ->
erlang:demonitor(MRef),
set_delta(0, State);
- {sync_message, Ref, M} ->
+ {sync_message, Ref, Msg, Props0} ->
credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
- %% TODO expiry needs fixing
- Props = #message_properties{expiry = undefined,
- needs_confirming = false,
- delivered = true},
- BQS1 = BQ:publish(M, Props, none, BQS),
+ Props = Props0#message_properties{needs_confirming = false,
+ delivered = true},
+ BQS1 = BQ:publish(Msg, Props, none, BQS),
sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})
end.