Skip to content

Commit

Permalink
Merge branch 'rabbitmq-server-86' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Mar 30, 2015
2 parents c4c4dc0 + bd4f8ae commit 4fdd61b
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/rabbit_queue_consumers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = BlockedQ} ->
AllConsumers = priority_queue:join(Consumers, BlockedQ),
All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
{[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
tags(priority_queue:to_list(AllConsumers)),
tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.

Expand Down Expand Up @@ -442,9 +443,12 @@ remove_consumer(ChPid, CTag, Queue) ->
end, Queue).

remove_consumers(ChPid, Queue) ->
priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
(_) -> true
end, Queue).
priority_queue:filter(chan_pred(ChPid, false), Queue).

chan_pred(ChPid, Want) ->
fun ({CP, _Consumer}) when CP =:= ChPid -> Want;
(_) -> not Want
end.

update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
Expand Down

0 comments on commit 4fdd61b

Please sign in to comment.