diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e97ed491a9..c960fad41d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -443,22 +443,25 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, Terms, +%% We can be recovering a transient queue if it crashed +init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + {PersistentClient, ContainsCheckFun} = + case IsDurable of + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun, AsyncCallback), + {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + false -> {undefined, fun(_MsgId) -> false end} + end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (MsgId) -> - rabbit_msg_store:contains(MsgId, PersistentClient) - end, - MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, + ContainsCheckFun, MsgIdxOnDiskFun), + init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> |
