Skip to content

Commit

Permalink
Merge bug24582
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry Kuch committed Dec 14, 2011
2 parents 46628f2 + 6cf72c5 commit 0c9e226
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 66 deletions.
3 changes: 2 additions & 1 deletion docs/rabbitmq-service.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Display usage information.
<para>
Install the service. The service will not be started.
Subsequent invocations will update the service parameters if
relevant environment variables were modified.
relevant environment variables were modified or if the active
plugins were changed.
</para>
</listitem>
</varlistentry>
Expand Down
16 changes: 16 additions & 0 deletions docs/rabbitmqctl.1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,22 @@
</para>
</listitem>
</varlistentry>

<varlistentry>
<term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Evaluate an arbitrary Erlang expression.
</para>
<para role="example-prefix">
For example:
</para>
<screen role="example">rabbitmqctl eval 'node().'</screen>
<para role="example">
This command returns the name of the node to which rabbitmqctl has connected.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect2>

Expand Down
18 changes: 16 additions & 2 deletions src/gen_server2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) ->
exit(process_not_registered)
end;
get_proc_name({global, Name}) ->
case global:safe_whereis_name(Name) of
case whereis_name(Name) of
undefined ->
exit(process_not_registered_globally);
Pid when Pid =:= self() ->
Expand All @@ -1101,7 +1101,7 @@ get_parent() ->
name_to_pid(Name) ->
case whereis(Name) of
undefined ->
case global:safe_whereis_name(Name) of
case whereis_name(Name) of
undefined ->
exit(could_not_find_registerd_name);
Pid ->
Expand All @@ -1111,6 +1111,20 @@ name_to_pid(Name) ->
Pid
end.

whereis_name(Name) ->
case ets:lookup(global_names, Name) of
[{_Name, Pid, _Method, _RPid, _Ref}] ->
if node(Pid) == node() ->
case is_process_alive(Pid) of
true -> Pid;
false -> undefined
end;
true ->
Pid
end;
[] -> undefined
end.

find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
PrioriCall = function_exported_or_default(
Mod, 'prioritise_call', 3,
Expand Down
42 changes: 28 additions & 14 deletions src/gm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).

Expand Down Expand Up @@ -515,8 +516,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
Expand All @@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
ok, State});

handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
Expand Down Expand Up @@ -604,7 +606,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});

handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
Expand All @@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
{Module:joined(Args, all_known_members(View)), State1});
{Module:joined(Args, get_pids(all_known_members(View))), State1});

handle_cast(leave, State) ->
{stop, normal, State}.
Expand Down Expand Up @@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
Result = Module:handle_msg(Args, Self, Msg),
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
Expand Down Expand Up @@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
Left, {add_on_right, Self}, infinity) of
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
Expand All @@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) ->
mnesia:sync_transaction(
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
version = 0 },
version = ?VERSION_START },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
Expand Down Expand Up @@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
ok = gen_server2:cast(get_pid(RealNeighbour),
{?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
ok = gen_server2:cast(RealNeighbour, Msg),
ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
_ -> gen_server2:cast(Neighbour, Msg)
_ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.

maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
erlang:monitor(process, Other).
erlang:monitor(process, get_pid(Other)).

check_neighbours(State = #state { self = Self,
left = Left,
Expand Down Expand Up @@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).

make_member(GroupName) ->
{case read_group(GroupName) of
#gm_group { version = Version } -> Version;
{error, not_found} -> ?VERSION_START
end, self()}.

get_pid({_Version, Pid}) -> Pid.

get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].

%% ---------------------------------------------------------------------------
%% Activity assembly
Expand All @@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).

send_right(Right, View, Msg) ->
ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).

callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
Module:handle_msg(Args, Id, Pub);
Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
Expand All @@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
_ -> Module:members_changed(Args, Births, Deaths)
_ -> Module:members_changed(Args, get_pids(Births),
get_pids(Deaths))
end.

handle_callback_result({Result, State}) ->
Expand Down
42 changes: 28 additions & 14 deletions src/mirrored_supervisor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) ->
start_link0(Prefix, Group, Init) ->
case apply(?SUPERVISOR, start_link,
Prefix ++ [?MODULE, {overall, Group, Init}]) of
{ok, Pid} -> call(Pid, {init, Pid}),
{ok, Pid};
{ok, Pid} -> case catch call(Pid, {init, Pid}) of
ok -> {ok, Pid};
E -> E
end;
Other -> Other
end.

Expand Down Expand Up @@ -346,13 +348,20 @@ handle_call({init, Overall}, _From,
end || Pid <- Rest],
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
[maybe_start(Group, Delegate, S) || S <- ChildSpecs],
{reply, ok, State#state{overall = Overall, delegate = Delegate}};
State1 = State#state{overall = Overall, delegate = Delegate},
case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
true -> {reply, ok, State1};
false -> {stop, shutdown, State1}
end;

handle_call({start_child, ChildSpec}, _From,
State = #state{delegate = Delegate,
group = Group}) ->
{reply, maybe_start(Group, Delegate, ChildSpec), State};
{reply, case maybe_start(Group, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
end, State};

handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group}) ->
Expand Down Expand Up @@ -400,13 +409,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
case lists:sort(?PG2:get_members(Group)) -- [Pid] of
[Self | _] -> {atomic, ChildSpecs} =
mnesia:transaction(fun() -> update_all(Pid) end),
[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> ok
end,
{noreply, State};
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
[Self | _] -> {atomic, ChildSpecs} =
mnesia:transaction(fun() -> update_all(Pid) end),
[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> []
end,
case all_started(R) of
true -> {noreply, State};
false -> {stop, shutdown, State}
end;

handle_info(Info, State) ->
{stop, {unexpected_info, Info}, State}.
Expand All @@ -428,8 +440,8 @@ maybe_start(Group, Delegate, ChildSpec) ->
check_start(Group, Delegate, ChildSpec)
end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> {error, already_present};
{atomic, Pid} -> {error, {already_started, Pid}};
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
%% If we are torn down while in the transaction...
{aborted, E} -> {error, E}
end.
Expand Down Expand Up @@ -499,6 +511,8 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].

all_started(Results) -> [] =:= [R || R = {error, _} <- Results].

%%----------------------------------------------------------------------------

create_tables() ->
Expand Down
29 changes: 26 additions & 3 deletions src/mirrored_supervisor_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ all_tests() ->
passed = test_start_idempotence(),
passed = test_unsupported(),
passed = test_ignore(),
passed = test_startup_failure(),
passed.

%% Simplest test
Expand Down Expand Up @@ -158,7 +159,7 @@ test_no_migration_on_shutdown() ->
try
call(worker, ping),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _} ->
catch exit:{timeout_waiting_for_server, _, _} ->
ok
end
end, [evil, good]).
Expand Down Expand Up @@ -195,6 +196,22 @@ test_ignore() ->
{sup, fake_strategy_for_ignore, []}),
passed.

test_startup_failure() ->
[test_startup_failure(F) || F <- [want_error, want_exit]],
passed.

test_startup_failure(Fail) ->
process_flag(trap_exit, true),
?MS:start_link(get_group(group), ?MODULE,
{sup, one_for_one, [childspec(Fail)]}),
receive
{'EXIT', _, shutdown} ->
ok
after 1000 ->
exit({did_not_exit, Fail})
end,
process_flag(trap_exit, false).

%% ---------------------------------------------------------------------------

with_sups(Fun, Sups) ->
Expand Down Expand Up @@ -228,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) ->
childspec(Id) ->
{Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.

start_gs(want_error) ->
{error, foo};

start_gs(want_exit) ->
exit(foo);

start_gs(Id) ->
gen_server:start_link({local, Id}, ?MODULE, server, []).

Expand All @@ -245,10 +268,10 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.

call(Id, Msg) -> call(Id, Msg, 100, 10).
call(Id, Msg) -> call(Id, Msg, 1000, 100).

call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}});
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});

call(Id, Msg, MaxDelay, Decr) ->
try
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[Node] -> {Node, undefined};
[First | Rest] -> {First, Rest}
[First | Rest] -> {First, [First | Rest]}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
Expand Down
Loading

0 comments on commit 0c9e226

Please sign in to comment.