From 6beeeb035729c3c2da0d152fddf1a78bfd38d4e8 Mon Sep 17 00:00:00 2001 From: Albert Schimpf Date: Wed, 14 Jul 2021 10:41:22 +0200 Subject: [PATCH] riak core vnode rewrite --- src/gen_fsm_compat.erl | 1106 --------------------------------- src/riak_core_vnode.erl | 759 +++++++++++----------- src/riak_core_vnode_proxy.erl | 26 +- 3 files changed, 396 insertions(+), 1495 deletions(-) delete mode 100644 src/gen_fsm_compat.erl diff --git a/src/gen_fsm_compat.erl b/src/gen_fsm_compat.erl deleted file mode 100644 index 48253e478..000000000 --- a/src/gen_fsm_compat.erl +++ /dev/null @@ -1,1106 +0,0 @@ -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1996-2018. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% %CopyrightEnd% -%% --module(gen_fsm_compat). - -%%%----------------------------------------------------------------- -%%% -%%% This state machine is somewhat more pure than state_lib. It is -%%% still based on State dispatching (one function per state), but -%%% allows a function handle_event to take care of events in all states. -%%% It's not that pure anymore :( We also allow synchronized event sending. -%%% -%%% If the Parent process terminates the Module:terminate/2 -%%% function is called. -%%% -%%% The user module should export: -%%% -%%% init(Args) -%%% ==> {ok, StateName, StateData} -%%% {ok, StateName, StateData, Timeout} -%%% ignore -%%% {stop, Reason} -%%% -%%% StateName(Msg, StateData) -%%% -%%% ==> {next_state, NewStateName, NewStateData} -%%% {next_state, NewStateName, NewStateData, Timeout} -%%% {stop, Reason, NewStateData} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% StateName(Msg, From, StateData) -%%% -%%% ==> {next_state, NewStateName, NewStateData} -%%% {next_state, NewStateName, NewStateData, Timeout} -%%% {reply, Reply, NewStateName, NewStateData} -%%% {reply, Reply, NewStateName, NewStateData, Timeout} -%%% {stop, Reason, NewStateData} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_event(Msg, StateName, StateData) -%%% -%%% ==> {next_state, NewStateName, NewStateData} -%%% {next_state, NewStateName, NewStateData, Timeout} -%%% {stop, Reason, Reply, NewStateData} -%%% {stop, Reason, NewStateData} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_sync_event(Msg, From, StateName, StateData) -%%% -%%% ==> {next_state, NewStateName, NewStateData} -%%% {next_state, NewStateName, NewStateData, Timeout} -%%% {reply, Reply, NewStateName, NewStateData} -%%% {reply, Reply, NewStateName, NewStateData, Timeout} -%%% {stop, Reason, Reply, NewStateData} -%%% {stop, Reason, NewStateData} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_info(Info, StateName) (e.g. {'EXIT', P, R}, {nodedown, N}, ... -%%% -%%% ==> {next_state, NewStateName, NewStateData} -%%% {next_state, NewStateName, NewStateData, Timeout} -%%% {stop, Reason, NewStateData} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% terminate(Reason, StateName, StateData) Let the user module clean up -%%% always called when server terminates -%%% -%%% ==> the return value is ignored -%%% -%%% -%%% The work flow (of the fsm) can be described as follows: -%%% -%%% User module fsm -%%% ----------- ------- -%%% start -----> start -%%% init <----- . -%%% -%%% loop -%%% StateName <----- . -%%% -%%% handle_event <----- . -%%% -%%% handle__sunc_event <----- . -%%% -%%% handle_info <----- . -%%% -%%% terminate <----- . -%%% -%%% -%%% --------------------------------------------------- - --export([start/3, - start/4, - start_link/3, - start_link/4, - stop/1, - stop/3, - send_event/2, - sync_send_event/2, - sync_send_event/3, - send_all_state_event/2, - sync_send_all_state_event/2, - sync_send_all_state_event/3, - reply/2, - start_timer/2, - send_event_after/2, - cancel_timer/1, - enter_loop/4, - enter_loop/5, - enter_loop/6, - wake_hib/7]). - -%% Internal exports --export([init_it/6, - system_continue/3, - system_terminate/4, - system_code_change/4, - system_get_state/1, - system_replace_state/2, - format_status/2]). - --import(error_logger, [format/2]). - -%%% --------------------------------------------------- -%%% Interface functions. -%%% --------------------------------------------------- - --callback init(Args :: term()) -> {ok, - StateName :: atom(), StateData :: term()} | - {ok, StateName :: atom(), StateData :: term(), - timeout() | hibernate} | - {stop, Reason :: term()} | - ignore. - --callback handle_event(Event :: term(), - StateName :: atom(), - StateData :: term()) -> {next_state, - NextStateName :: atom(), - NewStateData :: term()} | - {next_state, - NextStateName :: atom(), - NewStateData :: term(), - timeout() | hibernate} | - {stop, Reason :: term(), - NewStateData :: term()}. - --callback handle_sync_event(Event :: term(), - From :: {pid(), Tag :: term()}, StateName :: atom(), - StateData :: term()) -> {reply, Reply :: term(), - NextStateName :: atom(), - NewStateData :: term()} | - {reply, Reply :: term(), - NextStateName :: atom(), - NewStateData :: term(), - timeout() | hibernate} | - {next_state, - NextStateName :: atom(), - NewStateData :: term()} | - {next_state, - NextStateName :: atom(), - NewStateData :: term(), - timeout() | hibernate} | - {stop, Reason :: term(), - Reply :: term(), - NewStateData :: term()} | - {stop, Reason :: term(), - NewStateData :: term()}. - --callback handle_info(Info :: term(), - StateName :: atom(), - StateData :: term()) -> {next_state, - NextStateName :: atom(), - NewStateData :: term()} | - {next_state, - NextStateName :: atom(), - NewStateData :: term(), - timeout() | hibernate} | - {stop, Reason :: normal | term(), - NewStateData :: term()}. - --callback terminate(Reason :: normal | - shutdown | - {shutdown, term()} | - term(), - StateName :: atom(), StateData :: term()) -> term(). - --callback code_change(OldVsn :: term() | {down, term()}, - StateName :: atom(), StateData :: term(), - Extra :: term()) -> {ok, NextStateName :: atom(), - NewStateData :: term()}. - --callback format_status(Opt, - StatusData) -> Status when Opt :: normal | terminate, - StatusData :: [PDict | - State], - PDict :: [{Key :: term(), - Value :: term()}], - State :: term(), - Status :: term(). - --optional_callbacks([handle_info/3, - terminate/3, - code_change/4, - format_status/2]). - -%%% --------------------------------------------------- -%%% Starts a generic state machine. -%%% start(Mod, Args, Options) -%%% start(Name, Mod, Args, Options) -%%% start_link(Mod, Args, Options) -%%% start_link(Name, Mod, Args, Options) where: -%%% Name ::= {local, atom()} | {global, term()} | {via, atom(), term()} -%%% Mod ::= atom(), callback module implementing the 'real' fsm -%%% Args ::= term(), init arguments (to Module:init/1) -%%% Options ::= [{debug, [Flag]}] -%%% Flag ::= trace | log | {logfile, File} | statistics | debug -%%% (debug == log && statistics) -%%% Returns: {ok, Pid} | -%%% {error, {already_started, Pid}} | -%%% {error, Reason} -%%% --------------------------------------------------- -start(Mod, Args, Options) -> - gen:start(?MODULE, nolink, Mod, Args, Options). - -start(Name, Mod, Args, Options) -> - gen:start(?MODULE, nolink, Name, Mod, Args, Options). - -start_link(Mod, Args, Options) -> - gen:start(?MODULE, link, Mod, Args, Options). - -start_link(Name, Mod, Args, Options) -> - gen:start(?MODULE, link, Name, Mod, Args, Options). - -stop(Name) -> gen:stop(Name). - -stop(Name, Reason, Timeout) -> - gen:stop(Name, Reason, Timeout). - -send_event({global, Name}, Event) -> - catch global:send(Name, {'$gen_event', Event}), - ok; -send_event({via, Module, Name}, Event) -> - catch Module:send(Name, {'$gen_event', Event}), - ok; -send_event(Name, Event) -> - Name ! {'$gen_event', Event}, - ok. - -sync_send_event(Name, Event) -> - case catch gen:call(Name, '$gen_sync_event', Event) of - {ok, Res} -> Res; - {'EXIT', Reason} -> - exit({Reason, - {?MODULE, sync_send_event, [Name, Event]}}) - end. - -sync_send_event(Name, Event, Timeout) -> - case catch gen:call(Name, - '$gen_sync_event', - Event, - Timeout) - of - {ok, Res} -> Res; - {'EXIT', Reason} -> - exit({Reason, - {?MODULE, sync_send_event, [Name, Event, Timeout]}}) - end. - -send_all_state_event({global, Name}, Event) -> - catch global:send(Name, - {'$gen_all_state_event', Event}), - ok; -send_all_state_event({via, Module, Name}, Event) -> - catch Module:send(Name, - {'$gen_all_state_event', Event}), - ok; -send_all_state_event(Name, Event) -> - Name ! {'$gen_all_state_event', Event}, - ok. - -sync_send_all_state_event(Name, Event) -> - case catch gen:call(Name, - '$gen_sync_all_state_event', - Event) - of - {ok, Res} -> Res; - {'EXIT', Reason} -> - exit({Reason, - {?MODULE, sync_send_all_state_event, [Name, Event]}}) - end. - -sync_send_all_state_event(Name, Event, Timeout) -> - case catch gen:call(Name, - '$gen_sync_all_state_event', - Event, - Timeout) - of - {ok, Res} -> Res; - {'EXIT', Reason} -> - exit({Reason, - {?MODULE, - sync_send_all_state_event, - [Name, Event, Timeout]}}) - end. - -%% Designed to be only callable within one of the callbacks -%% hence using the self() of this instance of the process. -%% This is to ensure that timers don't go astray in global -%% e.g. when straddling a failover, or turn up in a restarted -%% instance of the process. - -%% Returns Ref, sends event {timeout,Ref,Msg} after Time -%% to the (then) current state. -start_timer(Time, Msg) -> - erlang:start_timer(Time, self(), {'$gen_timer', Msg}). - -%% Returns Ref, sends Event after Time to the (then) current state. -send_event_after(Time, Event) -> - erlang:start_timer(Time, self(), {'$gen_event', Event}). - -%% Returns the remaining time for the timer if Ref referred to -%% an active timer/send_event_after, false otherwise. -cancel_timer(Ref) -> - case erlang:cancel_timer(Ref) of - false -> - receive {timeout, Ref, _} -> 0 after 0 -> false end; - RemainingTime -> RemainingTime - end. - -%% enter_loop/4,5,6 -%% Makes an existing process into a gen_fsm. -%% The calling process will enter the gen_fsm receive loop and become a -%% gen_fsm process. -%% The process *must* have been started using one of the start functions -%% in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the process, -%% including registering a name for it. -enter_loop(Mod, Options, StateName, StateData) -> - enter_loop(Mod, - Options, - StateName, - StateData, - self(), - infinity). - -enter_loop(Mod, Options, StateName, StateData, - {Scope, _} = ServerName) - when Scope == local; Scope == global -> - enter_loop(Mod, - Options, - StateName, - StateData, - ServerName, - infinity); -enter_loop(Mod, Options, StateName, StateData, - {via, _, _} = ServerName) -> - enter_loop(Mod, - Options, - StateName, - StateData, - ServerName, - infinity); -enter_loop(Mod, Options, StateName, StateData, - Timeout) -> - enter_loop(Mod, - Options, - StateName, - StateData, - self(), - Timeout). - -enter_loop(Mod, Options, StateName, StateData, - ServerName, Timeout) -> - Name = gen:get_proc_name(ServerName), - Parent = gen:get_parent(), - Debug = gen:debug_options(Name, Options), - HibernateAfterTimeout = gen:hibernate_after(Options), - loop(Parent, - Name, - StateName, - StateData, - Mod, - Timeout, - HibernateAfterTimeout, - Debug). - -%%% --------------------------------------------------- -%%% Initiate the new process. -%%% Register the name using the Rfunc function -%%% Calls the Moduleinit/Args function. -%%% Finally an acknowledge is sent to Parent and the main -%%% loop is entered. -%%% --------------------------------------------------- -init_it(Starter, self, Name, Mod, Args, Options) -> - init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name0, Module, Args, - Options) -> - Name = gen:name(Name0), - Debug = gen:debug_options(Name, Options), - HibernateAfterTimeout = gen:hibernate_after(Options), - case catch Module:init(Args) of - {ok, StateName, StateData} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, - Name, - StateName, - StateData, - Module, - infinity, - HibernateAfterTimeout, - Debug); - {ok, StateName, StateData, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, - Name, - StateName, - StateData, - Module, - Timeout, - HibernateAfterTimeout, - Debug); - {stop, Reason} -> - gen:unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - gen:unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - gen:unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) - end. - -%%----------------------------------------------------------------- -%% The MAIN loop -%%----------------------------------------------------------------- -loop(Parent, Name, StateName, StateData, Mod, hibernate, - HibernateAfterTimeout, Debug) -> - proc_lib:hibernate(?MODULE, - wake_hib, - [Parent, - Name, - StateName, - StateData, - Mod, - HibernateAfterTimeout, - Debug]); -loop(Parent, Name, StateName, StateData, Mod, infinity, - HibernateAfterTimeout, Debug) -> - receive - Msg -> - decode_msg(Msg, - Parent, - Name, - StateName, - StateData, - Mod, - infinity, - HibernateAfterTimeout, - Debug, - false) - after HibernateAfterTimeout -> - loop(Parent, - Name, - StateName, - StateData, - Mod, - hibernate, - HibernateAfterTimeout, - Debug) - end; -loop(Parent, Name, StateName, StateData, Mod, Time, - HibernateAfterTimeout, Debug) -> - Msg = receive - Input -> Input after Time -> {'$gen_event', timeout} - end, - decode_msg(Msg, - Parent, - Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout, - Debug, - false). - -wake_hib(Parent, Name, StateName, StateData, Mod, - HibernateAfterTimeout, Debug) -> - Msg = receive Input -> Input end, - decode_msg(Msg, - Parent, - Name, - StateName, - StateData, - Mod, - hibernate, - HibernateAfterTimeout, - Debug, - true). - -decode_msg(Msg, Parent, Name, StateName, StateData, Mod, - Time, HibernateAfterTimeout, Debug, Hib) -> - case Msg of - {system, From, Req} -> - sys:handle_system_msg(Req, - From, - Parent, - ?MODULE, - Debug, - [Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout], - Hib); - {'EXIT', Parent, Reason} -> - terminate(Reason, - Name, - Msg, - Mod, - StateName, - StateData, - Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, - Parent, - Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout); - _Msg -> - Debug1 = sys:handle_debug(Debug, - fun print_event/3, - {Name, StateName}, - {in, Msg}), - handle_msg(Msg, - Parent, - Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout, - Debug1) - end. - -%%----------------------------------------------------------------- -%% Callback functions for system messages handling. -%%----------------------------------------------------------------- -system_continue(Parent, Debug, - [Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout]) -> - loop(Parent, - Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout, - Debug). - --spec system_terminate(term(), _, _, - [term(), ...]) -> no_return(). - -system_terminate(Reason, _Parent, Debug, - [Name, - StateName, - StateData, - Mod, - _Time, - _HibernateAfterTimeout]) -> - terminate(Reason, - Name, - [], - Mod, - StateName, - StateData, - Debug). - -system_code_change([Name, - StateName, - StateData, - Module, - Time, - HibernateAfterTimeout], - _Module, OldVsn, Extra) -> - case catch Module:code_change(OldVsn, - StateName, - StateData, - Extra) - of - {ok, NewStateName, NewStateData} -> - {ok, - [Name, - NewStateName, - NewStateData, - Module, - Time, - HibernateAfterTimeout]}; - Else -> Else - end. - -system_get_state([_Name, - StateName, - StateData, - _Mod, - _Time, - _HibernateAfterTimeout]) -> - {ok, {StateName, StateData}}. - -system_replace_state(StateFun, - [Name, - StateName, - StateData, - Mod, - Time, - HibernateAfterTimeout]) -> - Result = {NStateName, NStateData} = StateFun({StateName, - StateData}), - {ok, - Result, - [Name, - NStateName, - NStateData, - Mod, - Time, - HibernateAfterTimeout]}. - -%%----------------------------------------------------------------- -%% Format debug messages. Print them as the call-back module sees -%% them, not as the real erlang messages. Use trace for that. -%%----------------------------------------------------------------- -print_event(Dev, {in, Msg}, {Name, StateName}) -> - case Msg of - {'$gen_event', Event} -> - io:format(Dev, - "*DBG* ~tp got event ~tp in state ~tw~n", - [Name, Event, StateName]); - {'$gen_all_state_event', Event} -> - io:format(Dev, - "*DBG* ~tp got all_state_event ~tp in " - "state ~tw~n", - [Name, Event, StateName]); - {timeout, Ref, {'$gen_timer', Message}} -> - io:format(Dev, - "*DBG* ~tp got timer ~tp in state ~tw~n", - [Name, {timeout, Ref, Message}, StateName]); - {timeout, _Ref, {'$gen_event', Event}} -> - io:format(Dev, - "*DBG* ~tp got timer ~tp in state ~tw~n", - [Name, Event, StateName]); - _ -> - io:format(Dev, - "*DBG* ~tp got ~tp in state ~tw~n", - [Name, Msg, StateName]) - end; -print_event(Dev, {out, Msg, To, StateName}, Name) -> - io:format(Dev, - "*DBG* ~tp sent ~tp to ~tw~n and " - "switched to state ~tw~n", - [Name, Msg, To, StateName]); -print_event(Dev, return, {Name, StateName}) -> - io:format(Dev, - "*DBG* ~tp switched to state ~tw~n", - [Name, StateName]). - -handle_msg(Msg, Parent, Name, StateName, StateData, Mod, - _Time, HibernateAfterTimeout) -> - %No debug here - From = from(Msg), - case catch dispatch(Msg, Mod, StateName, StateData) of - {next_state, NStateName, NStateData} -> - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - infinity, - HibernateAfterTimeout, - []); - {next_state, NStateName, NStateData, Time1} -> - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - Time1, - HibernateAfterTimeout, - []); - {reply, Reply, NStateName, NStateData} - when From =/= undefined -> - reply(From, Reply), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - infinity, - HibernateAfterTimeout, - []); - {reply, Reply, NStateName, NStateData, Time1} - when From =/= undefined -> - reply(From, Reply), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - Time1, - HibernateAfterTimeout, - []); - {stop, Reason, NStateData} -> - terminate(Reason, - Name, - Msg, - Mod, - StateName, - NStateData, - []); - {stop, Reason, Reply, NStateData} - when From =/= undefined -> - {'EXIT', R} = (catch terminate(Reason, - Name, - Msg, - Mod, - StateName, - NStateData, - [])), - reply(From, Reply), - exit(R); - {'EXIT', - {undef, [{Mod, handle_info, [_, _, _], _} | _]}} -> - error_logger:warning_msg("** Undefined handle_info in ~p~n** Unhandled " - "message: ~tp~n", - [Mod, Msg]), - loop(Parent, - Name, - StateName, - StateData, - Mod, - infinity, - HibernateAfterTimeout, - []); - {'EXIT', What} -> - terminate(What, - Name, - Msg, - Mod, - StateName, - StateData, - []); - Reply -> - terminate({bad_return_value, Reply}, - Name, - Msg, - Mod, - StateName, - StateData, - []) - end. - -handle_msg(Msg, Parent, Name, StateName, StateData, Mod, - _Time, HibernateAfterTimeout, Debug) -> - From = from(Msg), - case catch dispatch(Msg, Mod, StateName, StateData) of - {next_state, NStateName, NStateData} -> - Debug1 = sys:handle_debug(Debug, - fun print_event/3, - {Name, NStateName}, - return), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - infinity, - HibernateAfterTimeout, - Debug1); - {next_state, NStateName, NStateData, Time1} -> - Debug1 = sys:handle_debug(Debug, - fun print_event/3, - {Name, NStateName}, - return), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - Time1, - HibernateAfterTimeout, - Debug1); - {reply, Reply, NStateName, NStateData} - when From =/= undefined -> - Debug1 = reply(Name, From, Reply, Debug, NStateName), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - infinity, - HibernateAfterTimeout, - Debug1); - {reply, Reply, NStateName, NStateData, Time1} - when From =/= undefined -> - Debug1 = reply(Name, From, Reply, Debug, NStateName), - loop(Parent, - Name, - NStateName, - NStateData, - Mod, - Time1, - HibernateAfterTimeout, - Debug1); - {stop, Reason, NStateData} -> - terminate(Reason, - Name, - Msg, - Mod, - StateName, - NStateData, - Debug); - {stop, Reason, Reply, NStateData} - when From =/= undefined -> - {'EXIT', R} = (catch terminate(Reason, - Name, - Msg, - Mod, - StateName, - NStateData, - Debug)), - _ = reply(Name, From, Reply, Debug, StateName), - exit(R); - {'EXIT', What} -> - terminate(What, - Name, - Msg, - Mod, - StateName, - StateData, - Debug); - Reply -> - terminate({bad_return_value, Reply}, - Name, - Msg, - Mod, - StateName, - StateData, - Debug) - end. - -dispatch({'$gen_event', Event}, Module, StateName, - StateData) -> - Module:StateName(Event, StateData); -dispatch({'$gen_all_state_event', Event}, Module, - StateName, StateData) -> - Module:handle_event(Event, StateName, StateData); -dispatch({'$gen_sync_event', From, Event}, Module, - StateName, StateData) -> - Module:StateName(Event, From, StateData); -dispatch({'$gen_sync_all_state_event', From, Event}, - Module, StateName, StateData) -> - Module:handle_sync_event(Event, - From, - StateName, - StateData); -dispatch({timeout, Ref, {'$gen_timer', Msg}}, Module, - StateName, StateData) -> - Module:StateName({timeout, Ref, Msg}, StateData); -dispatch({timeout, _Ref, {'$gen_event', Event}}, Module, - StateName, StateData) -> - Module:StateName(Event, StateData); -dispatch(Info, Module, StateName, StateData) -> - Module:handle_info(Info, StateName, StateData). - -from({'$gen_sync_event', From, _Event}) -> From; -from({'$gen_sync_all_state_event', From, _Event}) -> - From; -from(_) -> undefined. - -%% Send a reply to the client. -reply({To, Tag}, Reply) -> catch To ! {Tag, Reply}. - -reply(Name, {To, Tag}, Reply, Debug, StateName) -> - reply({To, Tag}, Reply), - sys:handle_debug(Debug, - fun print_event/3, - Name, - {out, Reply, To, StateName}). - -%%% --------------------------------------------------- -%%% Terminate the server. -%%% --------------------------------------------------- - --spec terminate(term(), _, _, atom(), _, _, - _) -> no_return(). - -terminate(Reason, Name, Msg, Module, StateName, - StateData, Debug) -> - case erlang:function_exported(Module, terminate, 3) of - true -> - case catch Module:terminate(Reason, - StateName, - StateData) - of - {'EXIT', R} -> - FmtStateData = format_status(terminate, - Module, - get(), - StateData), - error_info(R, - Name, - Msg, - StateName, - FmtStateData, - Debug), - exit(R); - _ -> ok - end; - false -> ok - end, - case Reason of - normal -> exit(normal); - shutdown -> exit(shutdown); - {shutdown, _} = Shutdown -> exit(Shutdown); - _ -> - FmtStateData1 = format_status(terminate, - Module, - get(), - StateData), - error_info(Reason, - Name, - Msg, - StateName, - FmtStateData1, - Debug), - exit(Reason) - end. - -error_info(Reason, Name, Msg, StateName, StateData, - Debug) -> - Reason1 = case Reason of - {undef, [{M, F, A, L} | MFAs]} -> - case code:is_loaded(M) of - false -> - {'module could not be loaded', - [{M, F, A, L} | MFAs]}; - _ -> - case erlang:function_exported(M, F, length(A)) of - true -> Reason; - false -> - {'function not exported', - [{M, F, A, L} | MFAs]} - end - end; - _ -> Reason - end, - Str = "** State machine ~tp terminating \n" ++ - get_msg_str(Msg) ++ - "** When State == ~tp~n** Data " - "== ~tp~n** Reason for termination = " - "~n** ~tp~n", - format(Str, - [Name, get_msg(Msg), StateName, StateData, Reason1]), - sys:print_log(Debug), - ok. - -get_msg_str({'$gen_event', _Event}) -> - "** Last event in was ~tp~n"; -get_msg_str({'$gen_sync_event', _Event}) -> - "** Last sync event in was ~tp~n"; -get_msg_str({'$gen_all_state_event', _Event}) -> - "** Last event in was ~tp (for all states)~n"; -get_msg_str({'$gen_sync_all_state_event', _Event}) -> - "** Last sync event in was ~tp (for all " - "states)~n"; -get_msg_str({timeout, _Ref, {'$gen_timer', _Msg}}) -> - "** Last timer event in was ~tp~n"; -get_msg_str({timeout, _Ref, {'$gen_event', _Msg}}) -> - "** Last timer event in was ~tp~n"; -get_msg_str(_Msg) -> "** Last message in was ~tp~n". - -get_msg({'$gen_event', Event}) -> Event; -get_msg({'$gen_sync_event', Event}) -> Event; -get_msg({'$gen_all_state_event', Event}) -> Event; -get_msg({'$gen_sync_all_state_event', Event}) -> Event; -get_msg({timeout, Ref, {'$gen_timer', Msg}}) -> - {timeout, Ref, Msg}; -get_msg({timeout, _Ref, {'$gen_event', Event}}) -> - Event; -get_msg(Msg) -> Msg. - -%%----------------------------------------------------------------- -%% Status information -%%----------------------------------------------------------------- - --if((?OTP_RELEASE) >= 22). - -format_status(Opt, StatusData) -> - [PDict, - SysState, - Parent, - Debug, - [Name, - StateName, - StateData, - Mod, - _Time, - _HibernateAfterTimeout]] = - StatusData, - Header = - gen:format_status_header("Status for state machine", - Name), - Log = sys:get_log(Debug), - Specfic = format_status(Opt, Mod, PDict, StateData), - Specfic = case format_status(Opt, Mod, PDict, StateData) - of - S when is_list(S) -> S; - S -> [S] - end, - [{header, Header}, - {data, - [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, - {"StateName", StateName}]} - | Specfic]. - --elif((?OTP_RELEASE) >= 21). - -format_status(Opt, StatusData) -> - [PDict, - SysState, - Parent, - Debug, - [Name, - StateName, - StateData, - Mod, - _Time, - _HibernateAfterTimeout]] = - StatusData, - Header = - gen:format_status_header("Status for state machine", - Name), - %% Log = sys:get_log(Debug), - Log = sys:get_debug(log, Debug, []), - Specfic = format_status(Opt, Mod, PDict, StateData), - Specfic = case format_status(Opt, Mod, PDict, StateData) - of - S when is_list(S) -> S; - S -> [S] - end, - [{header, Header}, - {data, - [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, - {"StateName", StateName}]} - | Specfic]. - --endif. - -format_status(Opt, Module, PDict, State) -> - DefStatus = case Opt of - terminate -> State; - _ -> [{data, [{"StateData", State}]}] - end, - case erlang:function_exported(Module, format_status, 2) - of - true -> - case catch Module:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> DefStatus; - Else -> Else - end; - _ -> DefStatus - end. diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index b95089dae..adccc5938 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -18,39 +18,32 @@ %% ------------------------------------------------------------------- -module(riak_core_vnode). --behaviour(gen_fsm_compat). +-behaviour(gen_statem). -include("riak_core_vnode.hrl"). --export([start_link/3, - start_link/4, - wait_for_init/1, - send_command/2, - send_command_after/2]). - +-export([callback_mode/0]). +-export([start_link/3, start_link/4]). -export([init/1, - started/2, started/3, - active/2, active/3, - handle_event/3, - handle_sync_event/4, - handle_info/3, terminate/3, code_change/4]). -export([reply/2, monitor/1]). --export([get_mod_index/1, - get_modstate/1, +-export([wait_for_init/1, + send_command/2, + handoff_error/3, + get_mod_index/1, set_forwarding/2, trigger_handoff/2, trigger_handoff/3, trigger_delete/1, core_status/1, - handoff_error/3]). + send_command_after/2]). -export([cast_finish_handoff/1, send_an_event/2, @@ -66,19 +59,9 @@ -include_lib("eunit/include/eunit.hrl"). --export([test_link/2, current_state/1]). - --endif. - --ifdef(PULSE). - --compile(export_all). +-include_lib("kernel/include/logger.hrl"). --compile({parse_transform, pulse_instrument}). - --compile({pulse_replace_module, - [{gen_fsm_compat, pulse_gen_fsm}, - {gen_server, pulse_gen_server}]}). +-export([test_link/2, current_state/1, get_modstate/1]). -endif. @@ -135,6 +118,29 @@ {stop, Reason :: term(), NewModState :: term()}. +%% handle_exit/3 is an optional behaviour callback that can be implemented. +%% It will be called in the case that a process that is linked to the vnode +%% process dies and allows the module using the behaviour to take appropriate +%% action. It is called by handle_info when it receives an {'EXIT', Pid, Reason} +%% message and the function signature is: handle_exit(Pid, Reason, State). +%% +%% It should return a tuple indicating the next state for the fsm. For a list of +%% valid return types see the documentation for the gen_fsm_compat handle_info callback. +%% +%% Here is what the spec for handle_exit/3 would look like: +%% -spec handle_exit(pid(), atom(), term()) -> +%% {noreply, term()} | +%% {stop, term(), term()} + +%% handle_info/2 is an optional behaviour callback too. +%% It will be called in the case when a vnode receives any other message +%% than an EXIT message. +%% The function signature is: handle_info(Info, State). +%% It should return a tuple of the form {ok, NextState} +%% +%% Here is what the spec for handle_info/2 would look like: +%% -spec handle_info(term(), term()) -> {ok, term()} + -callback handle_exit(pid(), Reason :: term(), ModState :: term()) -> {noreply, NewModState :: term()} | @@ -220,29 +226,6 @@ -callback handle_overload_info(Request :: term(), Idx :: partition()) -> ok. -%% handle_exit/3 is an optional behaviour callback that can be implemented. -%% It will be called in the case that a process that is linked to the vnode -%% process dies and allows the module using the behaviour to take appropriate -%% action. It is called by handle_info when it receives an {'EXIT', Pid, Reason} -%% message and the function signature is: handle_exit(Pid, Reason, State). -%% -%% It should return a tuple indicating the next state for the fsm. For a list of -%% valid return types see the documentation for the gen_fsm_compat handle_info callback. -%% -%% Here is what the spec for handle_exit/3 would look like: -%% -spec handle_exit(pid(), atom(), term()) -> -%% {noreply, term()} | -%% {stop, term(), term()} - -%% handle_info/2 is an optional behaviour callback too. -%% It will be called in the case when a vnode receives any other message -%% than an EXIT message. -%% The function signature is: handle_info(Info, State). -%% It should return a tuple of the form {ok, NextState} -%% -%% Here is what the spec for handle_info/2 would look like: -%% -spec handle_info(term(), term()) -> {ok, term()} - -define(DEFAULT_TIMEOUT, 60000). -define(LOCK_RETRY_TIMEOUT, 10000). @@ -256,119 +239,106 @@ start_link(Mod, Index, Forward) -> start_link(Mod, Index, InitialInactivityTimeout, Forward) -> - gen_fsm_compat:start_link(?MODULE, - [Mod, Index, InitialInactivityTimeout, Forward], - []). + gen_statem:start_link(?MODULE, + [Mod, Index, InitialInactivityTimeout, Forward], + []). -%% #1 - State started +%% #1 call - State started wait_for_init(Vnode) -> - gen_fsm_compat:sync_send_event(Vnode, - wait_for_init, - infinity). + gen_statem:call(Vnode, wait_for_init). -%% #2 - +%% #2 cast %% Send a command message for the vnode module by Pid - %% typically to do some deferred processing after returning yourself send_command(Pid, Request) -> - gen_fsm_compat:send_event(Pid, - #riak_vnode_req_v1{request = Request}). + gen_statem:cast(Pid, + #riak_vnode_req_v1{request = Request}). -%% #3 - +%% #3 cast handoff_error(Vnode, Err, Reason) -> - gen_fsm_compat:send_event(Vnode, - {handoff_error, Err, Reason}). + gen_statem:cast(Vnode, {handoff_error, Err, Reason}). -%% #4 - +%% #4 call get_mod_index(VNode) -> - gen_fsm_compat:sync_send_all_state_event(VNode, - get_mod_index). + gen_statem:call(VNode, get_mod_index). -%% #5 +%% #5 cast set_forwarding(VNode, ForwardTo) -> - gen_fsm_compat:send_all_state_event(VNode, - {set_forwarding, ForwardTo}). + gen_statem:cast(VNode, {set_forwarding, ForwardTo}). -%% #6 +%% #6 cast trigger_handoff(VNode, TargetIdx, TargetNode) -> - gen_fsm_compat:send_all_state_event(VNode, - {trigger_handoff, - TargetIdx, - TargetNode}). + gen_statem:cast(VNode, + {trigger_handoff, TargetIdx, TargetNode}). -%% #7 +%% #7 cast trigger_handoff(VNode, TargetNode) -> - gen_fsm_compat:send_all_state_event(VNode, - {trigger_handoff, TargetNode}). + gen_statem:cast(VNode, {trigger_handoff, TargetNode}). -%% #8 +%% #8 cast trigger_delete(VNode) -> - gen_fsm_compat:send_all_state_event(VNode, - trigger_delete). + gen_statem:cast(VNode, trigger_delete). -%% #9 +%% #9 cast core_status(VNode) -> - gen_fsm_compat:sync_send_all_state_event(VNode, - core_status). + gen_statem:call(VNode, core_status). -%% #10 +%% #10 %TODO %% Sends a command to the FSM that called it after Time %% has passed. -spec send_command_after(integer(), term()) -> reference(). send_command_after(Time, Request) -> - gen_fsm_compat:send_event_after(Time, - #riak_vnode_req_v1{request = Request}). + %gen_fsm_compat:send_event_after(Time, #riak_vnode_req_v1{request = Request}). + erlang:start_timer(Time, + self(), + {'$gen_cast', #riak_vnode_req_v1{request = Request}}). -%%%%%%% %new APIs %% #11 - riak_core_vnode_manager - handle_vnode_event cast_finish_handoff(VNode) -> - gen_fsm_compat:send_all_state_event(VNode, - finish_handoff). + gen_statem:cast(VNode, finish_handoff). %% #12 - riak_core_vnode_manager - handle_vnode_event cancel_handoff(VNode) -> - gen_fsm_compat:send_all_state_event(VNode, - cancel_handoff). + gen_statem:cast(VNode, cancel_handoff). %% #13 - riak_core_vnode_master - send_an_event send_an_event(VNode, Event) -> - gen_fsm_compat:send_event(VNode, Event). + gen_statem:cast(VNode, Event). %% #14 - riak_core_vnode_master - handle_cast/handle_call - %riak_core_vnode_master - command2 %riak_core_vnode_proxy - handle_call -send_req(VNode, Req) -> - gen_fsm_compat:send_event(VNode, Req). +send_req(VNode, Req) -> gen_statem:cast(VNode, Req). %% #15 - riak_core_vnode_master - handle_call send_all_proxy_req(VNode, Req) -> - gen_fsm_compat:send_all_state_event(VNode, Req). + gen_statem:call(VNode, Req). %% #16 - riak:core_handoff_sender - start_fold_ -spec handoff_complete(VNode :: pid()) -> ok. handoff_complete(VNode) -> - gen_fsm_compat:send_event(VNode, handoff_complete). + gen_statem:cast(VNode, handoff_complete). %% #17 - riak:core_handoff_sender - start_fold_ -spec resize_transfer_complete(VNode :: pid(), NotSentAcc :: term()) -> ok. resize_transfer_complete(VNode, NotSentAcc) -> - gen_fsm_compat:send_event(VNode, - {resize_transfer_complete, NotSentAcc}). + gen_statem:cast(VNode, + {resize_transfer_complete, NotSentAcc}). %% #18 - riak_core_handoff_receiver - process_message handoff_data(VNode, MsgData, VNodeTimeout) -> - gen_fsm_compat:sync_send_all_state_event(VNode, - {handoff_data, MsgData}, - VNodeTimeout). + gen_statem:call(VNode, + {handoff_data, MsgData}, + VNodeTimeout). %% #19 - riak_core_vnode_proxy - handle_cast unregistered(VNode) -> - gen_fsm_compat:send_event(VNode, unregistered). + gen_statem:call(VNode, unregistered). %% @doc Send a reply to a vnode request. If %% the Ref is undefined just send the reply @@ -426,6 +396,8 @@ monitor(ignore) -> erlang:monitor(process, self()). manager_event_timer :: reference() | undefined, inactivity_timeout :: non_neg_integer()}). +callback_mode() -> state_functions. + init([Module, Index, InitialInactivityTimeout, @@ -474,115 +446,82 @@ code_change(_OldVsn, StateName, State, _Extra) -> %% started %% ======== -started(timeout, +started(timeout, _MSG, State = #state{inactivity_timeout = InitialInactivityTimeout}) -> case do_init(State) of {ok, State2} -> {next_state, active, State2, InitialInactivityTimeout}; {error, Reason} -> {stop, Reason} - end. - -started(wait_for_init, _From, + end; +%% #1 +started({call, From}, wait_for_init, State = #state{inactivity_timeout = InitialInactivityTimeout}) -> case do_init(State) of {ok, State2} -> - {reply, ok, active, State2, InitialInactivityTimeout}; + {next_state, + active, + State2, + [InitialInactivityTimeout, {reply, From, ok}]}; {error, Reason} -> {stop, Reason} - end. + end; +%% #only test +started({call, From}, current_state, State) -> + {next_state, + started, + State, + {reply, From, {started, State}}}. -%%active -%%%%%%%%%%%% -active(timeout, +%% active +%% ======== +%% #timeout +active(timeout, _MSG, State = #state{mod = Module, index = Idx}) -> riak_core_vnode_manager:vnode_event(Module, Idx, self(), inactive), continue(State); -active(#riak_coverage_req_v1{keyspaces = KeySpaces, - request = Request, sender = Sender}, - State) -> - %% Coverage request handled in handoff and non-handoff. Will be forwarded if set. - vnode_coverage(Sender, Request, KeySpaces, State); -active(#riak_vnode_req_v1{sender = Sender, - request = {resize_forward, Request}}, - State) -> - vnode_command(Sender, Request, State); -active(#riak_vnode_req_v1{sender = Sender, - request = Request}, - State = #state{handoff_target = HT}) - when HT =:= none -> - forward_or_vnode_command(Sender, Request, State); -active(#riak_vnode_req_v1{sender = Sender, - request = Request}, - State = #state{handoff_type = resize, - handoff_target = {HOIdx, HONode}, index = Index, - forward = Forward, mod = Module}) -> - RequestHash = Module:request_hash(Request), - case RequestHash of - %% will never have enough information to forward request so only handle locally - undefined -> vnode_command(Sender, Request, State); - _ -> - {ok, R} = riak_core_ring_manager:get_my_ring(), - FutureIndex = riak_core_ring:future_index(RequestHash, - Index, - R), - case FutureIndex of - %% request for portion of keyspace currently being transferred - HOIdx -> - vnode_handoff_command(Sender, - Request, - {HOIdx, HONode}, - State); - %% some portions of keyspace already transferred - _Other when is_list(Forward) -> - vnode_resize_command(Sender, - Request, - FutureIndex, - State); - %% some portions of keyspace not already transferred - _Other -> vnode_command(Sender, Request, State) - end - end; -active(#riak_vnode_req_v1{sender = Sender, - request = Request}, - State) -> - vnode_handoff_command(Sender, - Request, - State#state.handoff_target, - State); -active(handoff_complete, State) -> - State2 = start_manager_event_timer(handoff_complete, - State), - continue(State2); -active({resize_transfer_complete, SeenIdxs}, - State = #state{mod = Module, modstate = ModState, - handoff_target = Target}) -> - case Target of - none -> continue(State); - _ -> - %% TODO: refactor similarties w/ finish_handoff handle_event - {ok, NewModState} = Module:handoff_finished(Target, - ModState), - finish_handoff(SeenIdxs, - State#state{modstate = NewModState}) - end; -active({handoff_error, _Err, _Reason}, State) -> +%% #3 +active(cast, {handoff_error, _Err, _Reason}, State) -> State2 = start_manager_event_timer(handoff_error, State), continue(State2); -active({send_manager_event, Event}, State) -> - State2 = start_manager_event_timer(Event, State), - continue(State2); -active({trigger_handoff, TargetNode}, State) -> - active({trigger_handoff, State#state.index, TargetNode}, - State); -active({trigger_handoff, TargetIdx, TargetNode}, +%% #4 +active({call, From}, get_mod_index, + State = #state{index = Idx, mod = Module}) -> + {next_state, + active, + State, + [State#state.inactivity_timeout, + {reply, From, {Module, Idx}}]}; +%% #5 +active(cast, {set_forwarding, undefined}, + State = #state{modstate = {deleted, _ModState}}) -> + %% The vnode must forward requests when in the deleted state, therefore + %% ignore requests to stop forwarding. + continue(State); +%% #5 +active(cast, {set_forwarding, ForwardTo}, State) -> + logger:debug("vnode fwd :: ~p/~p :: ~p -> ~p~n", + [State#state.mod, + State#state.index, + State#state.forward, + ForwardTo]), + State2 = mod_set_forwarding(ForwardTo, State), + continue(State2#state{forward = ForwardTo}); +%% #7 +active(cast, {trigger_handoff, TargetIdx, TargetNode}, State) -> maybe_handoff(TargetIdx, TargetNode, State); -active(trigger_delete, +%% #6 +active(cast, {trigger_handoff, TargetNode}, State) -> + active(cast, + {trigger_handoff, State#state.index, TargetNode}, + State); +%% #8 +active(cast, trigger_delete, State = #state{mod = Module, modstate = ModState, index = Idx}) -> case mark_delete_complete(Idx, Module) of @@ -595,50 +534,46 @@ active(trigger_delete, riak_core_vnode_manager:unregister_vnode(Idx, Module), continue(State#state{modstate = {deleted, NewModState}}); -active(unregistered, - State = #state{mod = Module, index = Index}) -> - %% Add exclusion so the ring handler will not try to spin this vnode - %% up until it receives traffic. - riak_core_handoff_manager:add_exclusion(Module, Index), - logger:debug("~p ~p vnode excluded and unregistered.", - [Index, Module]), - {stop, - normal, - State#state{handoff_target = none, - handoff_type = undefined, pool_pid = undefined}}. - -active(_Event, _From, State) -> - Reply = ok, - {reply, - Reply, +%% #9 +active({call, From}, core_status, + State = #state{index = Index, mod = Module, + modstate = ModState, handoff_target = HT, + forward = FN}) -> + Mode = case {FN, HT} of + {undefined, none} -> active; + {undefined, HT} -> handoff; + {FN, none} -> forward; + _ -> undefined + end, + Status = [{index, Index}, {mod, Module}] ++ + case FN of + undefined -> []; + _ -> [{forward, FN}] + end + ++ + case HT of + none -> []; + _ -> [{handoff_target, HT}] + end + ++ + case ModState of + {deleted, _} -> [deleted]; + _ -> [] + end, + {next_state, active, State, - State#state.inactivity_timeout}. - -%% handle_event -%%%%%%%%%%%%%%%% - -handle_event({set_forwarding, undefined}, _StateName, - State = #state{modstate = {deleted, _ModState}}) -> - %% The vnode must forward requests when in the deleted state, therefore - %% ignore requests to stop forwarding. - continue(State); -handle_event({set_forwarding, ForwardTo}, _StateName, - State) -> - logger:debug("vnode fwd :: ~p/~p :: ~p -> ~p~n", - [State#state.mod, - State#state.index, - State#state.forward, - ForwardTo]), - State2 = mod_set_forwarding(ForwardTo, State), - continue(State2#state{forward = ForwardTo}); -handle_event(finish_handoff, _StateName, - State = #state{modstate = {deleted, _ModState}}) -> + [State#state.inactivity_timeout, + {reply, From, {Mode, Status}}]}; +%% #11 +active(cast, finish_handoff, + State = #state{modstate = {deleted, _ModState}}) -> stop_manager_event_timer(State), continue(State#state{handoff_target = none}); -handle_event(finish_handoff, _StateName, - State = #state{mod = Module, modstate = ModState, - handoff_target = Target}) -> +%% #11 +active(cast, finish_handoff, + State = #state{mod = Module, modstate = ModState, + handoff_target = Target}) -> stop_manager_event_timer(State), case Target of none -> continue(State); @@ -647,8 +582,9 @@ handle_event(finish_handoff, _StateName, ModState), finish_handoff(State#state{modstate = NewModState}) end; -handle_event(cancel_handoff, _StateName, - State = #state{mod = Module, modstate = ModState}) -> +%% #12 +active(cast, cancel_handoff, + State = #state{mod = Module, modstate = ModState}) -> %% it would be nice to pass {Err, Reason} to the vnode but the %% API doesn't currently allow for that. stop_manager_event_timer(State), @@ -660,119 +596,150 @@ handle_event(cancel_handoff, _StateName, handoff_type = undefined, modstate = NewModState}) end; -handle_event({trigger_handoff, TargetNode}, StateName, - State) -> - handle_event({trigger_handoff, - State#state.index, - TargetNode}, - StateName, - State); -handle_event({trigger_handoff, _TargetIdx, _TargetNode}, - _StateName, - State = #state{modstate = {deleted, _ModState}}) -> - continue(State); -handle_event(R = {trigger_handoff, - _TargetIdx, - _TargetNode}, - _StateName, State) -> - active(R, State); -handle_event(trigger_delete, _StateName, - State = #state{modstate = {deleted, _}}) -> - continue(State); -handle_event(trigger_delete, _StateName, State) -> - active(trigger_delete, State); -handle_event(R = #riak_vnode_req_v1{}, _StateName, - State) -> - active(R, State); -handle_event(R = #riak_coverage_req_v1{}, _StateName, - State) -> - active(R, State). - -%%handle_sync_event -%%%%%%%%%%%%%%%%%%%% - -handle_sync_event(current_state, _From, StateName, - State) -> - {reply, {StateName, State}, StateName, State}; -handle_sync_event(get_mod_index, _From, StateName, - State = #state{index = Idx, mod = Mod}) -> - {reply, - {Mod, Idx}, - StateName, - State, - State#state.inactivity_timeout}; -handle_sync_event({handoff_data, _BinObj}, _From, - StateName, - State = #state{modstate = {deleted, _ModState}}) -> - {reply, - {error, vnode_exiting}, - StateName, +%% #16 +active(cast, handoff_complete, State) -> + State2 = start_manager_event_timer(handoff_complete, + State), + continue(State2); +%% #17 +active(cast, {resize_transfer_complete, SeenIdxs}, + State = #state{mod = Module, modstate = ModState, + handoff_target = Target}) -> + case Target of + none -> continue(State); + _ -> + %% TODO: refactor similarties w/ finish_handoff handle_event + {ok, NewModState} = Module:handoff_finished(Target, + ModState), + finish_handoff(SeenIdxs, + State#state{modstate = NewModState}) + end; +%% #18 +active({call, From}, {handoff_data, _BinObj}, + State = #state{modstate = {deleted, _ModState}}) -> + {next_state, + active, State, - State#state.inactivity_timeout}; -handle_sync_event({handoff_data, BinObj}, _From, - StateName, - State = #state{mod = Module, modstate = ModState}) -> + [State#state.inactivity_timeout, + {reply, From, {error, vnode_exiting}}]}; +%{reply, {error, vnode_exiting}, StateName, State, +% State#state.inactivity_timeout}; +%% #18 +active({call, From}, {handoff_data, BinObj}, + State = #state{mod = Module, modstate = ModState}) -> case Module:handle_handoff_data(BinObj, ModState) of {reply, ok, NewModState} -> - {reply, - ok, - StateName, + {next_state, + active, State#state{modstate = NewModState}, - State#state.inactivity_timeout}; + [State#state.inactivity_timeout, {reply, From, ok}]}; + % {reply, ok, StateName, + % State#state{modstate = NewModState}, + % State#state.inactivity_timeout}; {reply, {error, Err}, NewModState} -> logger:error("~p failed to store handoff obj: ~p", [Module, Err]), - {reply, - {error, Err}, - StateName, + {next_state, + active, State#state{modstate = NewModState}, - State#state.inactivity_timeout} + [State#state.inactivity_timeout, + {reply, From, {error, Err}}]} end; -handle_sync_event(core_status, _From, StateName, - State = #state{index = Index, mod = Module, - modstate = ModState, handoff_target = HT, - forward = FN}) -> - Mode = case {FN, HT} of - {undefined, none} -> active; - {undefined, HT} -> handoff; - {FN, none} -> forward; - _ -> undefined - end, - Status = [{index, Index}, {mod, Module}] ++ - case FN of - undefined -> []; - _ -> [{forward, FN}] - end - ++ - case HT of - none -> []; - _ -> [{handoff_target, HT}] - end - ++ - case ModState of - {deleted, _} -> [deleted]; - _ -> [] - end, - {reply, - {Mode, Status}, - StateName, - State, - State#state.inactivity_timeout}. - -%%handle_info -%%%%%%%%%%%%%% - -handle_info({'$vnode_proxy_ping', From, Ref, Msgs}, - StateName, State) -> +% {reply, {error, Err}, StateName, +% State#state{modstate = NewModState}, +% State#state.inactivity_timeout} +%% #19 +active(cast, unregistered, + State = #state{mod = Module, index = Index}) -> + %% Add exclusion so the ring handler will not try to spin this vnode + %% up until it receives traffic. + riak_core_handoff_manager:add_exclusion(Module, Index), + logger:debug("~p ~p vnode excluded and unregistered.", + [Index, Module]), + {stop, + normal, + State#state{handoff_target = none, + handoff_type = undefined, pool_pid = undefined}}; +%% internal +%%%%%%%%%%%% +%% # start_manager_event_timer +active(cast, {send_manager_event, Event}, State) -> + State2 = start_manager_event_timer(Event, State), + continue(State2); +%% 13 +active(_C, + #riak_coverage_req_v1{keyspaces = KeySpaces, + request = Request, sender = Sender}, + State) -> + %% Coverage request handled in handoff and non-handoff. Will be forwarded if set. + vnode_coverage(Sender, Request, KeySpaces, State); +%% forward_request +active(_C, + #riak_vnode_req_v1{sender = Sender, + request = {resize_forward, Request}}, + State) -> + vnode_command(Sender, Request, State); +%% # finish_handoff +active(_C, + #riak_vnode_req_v1{sender = Sender, request = Request}, + State = #state{handoff_target = HT}) + when HT =:= none -> + forward_or_vnode_command(Sender, Request, State); +%% maybe_handoff +active(_C, + #riak_vnode_req_v1{sender = Sender, request = Request}, + State = #state{handoff_type = resize, + handoff_target = {HOIdx, HONode}, index = Index, + forward = Forward, mod = Module}) -> + RequestHash = Module:request_hash(Request), + case RequestHash of + %% will never have enough information to forward request so only handle locally + undefined -> vnode_command(Sender, Request, State); + _ -> + {ok, R} = riak_core_ring_manager:get_my_ring(), + FutureIndex = riak_core_ring:future_index(RequestHash, + Index, + R), + case FutureIndex of + %% request for portion of keyspace currently being transferred + HOIdx -> + vnode_handoff_command(Sender, + Request, + {HOIdx, HONode}, + State); + %% some portions of keyspace already transferred + _Other when is_list(Forward) -> + vnode_resize_command(Sender, + Request, + FutureIndex, + State); + %% some portions of keyspace not already transferred + _Other -> vnode_command(Sender, Request, State) + end + end; +%% +active(cast, + #riak_vnode_req_v1{sender = Sender, request = Request}, + State) -> + vnode_handoff_command(Sender, + Request, + State#state.handoff_target, + State); +%% info +%%%%%%%% +%% +active(info, {'$vnode_proxy_ping', From, Ref, Msgs}, + State) -> riak_core_vnode_proxy:cast(From, {vnode_proxy_pong, Ref, Msgs}), {next_state, - StateName, + active, State, State#state.inactivity_timeout}; -handle_info({'EXIT', Pid, Reason}, _StateName, - State = #state{mod = Module, index = Index, - pool_pid = Pid, pool_config = PoolConfig}) -> +%% +active(info, {'EXIT', Pid, Reason}, + State = #state{mod = Module, index = Index, + pool_pid = Pid, pool_config = PoolConfig}) -> case Reason of Reason when Reason == normal; Reason == shutdown -> continue(State#state{pool_pid = undefined}); @@ -791,22 +758,25 @@ handle_info({'EXIT', Pid, Reason}, _StateName, worker_props), continue(State#state{pool_pid = NewPoolPid}) end; -handle_info({'DOWN', _Ref, process, _Pid, normal}, - _StateName, State = #state{modstate = {deleted, _}}) -> +%% +active(info, {'DOWN', _Ref, process, _Pid, normal}, + State = #state{modstate = {deleted, _}}) -> %% these messages are produced by riak_kv_vnode's aae tree %% monitors; they are harmless, so don't yell about them. also %% only dustbin them in the deleted modstate, because pipe vnodes %% need them in other states continue(State); -handle_info(Info, _StateName, - State = #state{mod = Module, modstate = {deleted, _}, - index = Index}) -> +%% +active({info, _F}, Info, + State = #state{mod = Module, modstate = {deleted, _}, + index = Index}) -> logger:info("~p ~p ignored handle_info ~p - vnode " "unregistering\n", [Index, Module, Info]), continue(State); -handle_info({'EXIT', Pid, Reason}, StateName, - State = #state{mod = Module, modstate = ModState}) -> +%% +active({info, _F}, {'EXIT', Pid, Reason}, + State = #state{mod = Module, modstate = ModState}) -> %% A linked processes has died so use the %% handle_exit callback to allow the vnode %% process to take appropriate action. @@ -815,7 +785,7 @@ handle_info({'EXIT', Pid, Reason}, StateName, try case Module:handle_exit(Pid, Reason, ModState) of {noreply, NewModState} -> {next_state, - StateName, + active, State#state{modstate = NewModState}, State#state.inactivity_timeout}; {stop, Reason1, NewModState} -> @@ -824,21 +794,34 @@ handle_info({'EXIT', Pid, Reason}, StateName, catch _ErrorType:undef -> {stop, linked_process_crash, State} end; -handle_info(Info, StateName, - State = #state{mod = Module, modstate = ModState}) -> +%% +active({info, _F}, Info, + State = #state{mod = Module, modstate = ModState}) -> case erlang:function_exported(Module, handle_info, 2) of true -> {ok, NewModState} = Module:handle_info(Info, ModState), {next_state, - StateName, + active, State#state{modstate = NewModState}, State#state.inactivity_timeout}; false -> {next_state, - StateName, + active, State, State#state.inactivity_timeout} - end. + end; +%% only TEST +active({call, From}, current_state, State) -> + {next_state, + active, + State, + [{reply, From, {active, State}}]}; +%% # all? %TODO +active({_C, From}, _MSG, State) -> + {next_state, + active, + State, + [State#state.inactivity_timeout, {reply, From, ok}]}. %% ======================== %% ======== @@ -1004,8 +987,6 @@ vnode_coverage(Sender, Request, KeySpaces, Sender, ModState); NextOwner -> - logger:debug("Forwarding coverage ~p -> ~p: ~p~n", - [node(), NextOwner, Index]), riak_core_vnode_master:coverage(Request, {Index, NextOwner}, KeySpaces, @@ -1026,9 +1007,6 @@ vnode_coverage(Sender, Request, KeySpaces, Work, From), continue(State, NewModState); - {PoolName, _Work, _From, NewModState} -> - logger:error("Worker pools not supported: ~p", [PoolName]), - {stop, not_supported, State#state{modstate = NewModState}}; {stop, Reason, NewModState} -> {stop, Reason, State#state{modstate = NewModState}} end. @@ -1425,8 +1403,12 @@ start_manager_event_timer(Event, self(), Event), stop_manager_event_timer(State), - T2 = gen_fsm_compat:send_event_after(30000, - {send_manager_event, Event}), + %TODO correct way to start an event after x? + T2 = erlang:start_timer(30000, + self(), + {'$gen_cast', {send_manager_event, Event}}), + %T2 = gen_statem:send_event_after(30000, + % {send_manager_event, Event}), State#state{manager_event_timer = T2}. stop_manager_event_timer(#state{manager_event_timer = @@ -1434,9 +1416,16 @@ stop_manager_event_timer(#state{manager_event_timer = ok; stop_manager_event_timer(#state{manager_event_timer = T}) -> - _ = gen_fsm_compat:cancel_timer(T), + _ = cancel_timer(T), ok. +cancel_timer(Ref) -> + case erlang:cancel_timer(Ref) of + false -> + receive {timeout, Ref, _} -> 0 after 0 -> false end; + RemainingTime -> RemainingTime + end. + mod_set_forwarding(_Forward, State = #state{modstate = {deleted, _}}) -> State; @@ -1455,6 +1444,7 @@ mod_set_forwarding(Forward, %% =================================================================== %% Test API %% =================================================================== +-ifdef(TEST). -type state() :: #state{}. @@ -1462,49 +1452,22 @@ mod_set_forwarding(Forward, -spec get_modstate(pid()) -> {atom(), state()}. get_modstate(Pid) -> - {_StateName, State} = - gen_fsm_compat:sync_send_all_state_event(Pid, - current_state), + {_StateName, State} = gen_statem:call(Pid, + current_state), {State#state.mod, State#state.modstate}. --ifdef(TEST). - %% Start the garbage collection server test_link(Mod, Index) -> - gen_fsm_compat:start_link(?MODULE, - [Mod, Index, 0, node()], - []). + gen_statem:start_link(?MODULE, + [Mod, Index, 0, node()], + []). %% Get the current state of the fsm for testing inspection -spec current_state(pid()) -> {atom(), state()} | {error, term()}. current_state(Pid) -> - gen_fsm_compat:sync_send_all_state_event(Pid, - current_state). - -wait_for_process_death(Pid) -> - wait_for_process_death(Pid, is_process_alive(Pid)). - -wait_for_process_death(Pid, true) -> - wait_for_process_death(Pid, is_process_alive(Pid)); -wait_for_process_death(_Pid, false) -> ok. - -wait_for_state_update(OriginalStateData, Pid) -> - {_, CurrentStateData} = (?MODULE):current_state(Pid), - wait_for_state_update(OriginalStateData, - CurrentStateData, - Pid). - -wait_for_state_update(OriginalStateData, - OriginalStateData, Pid) -> - {_, CurrentStateData} = (?MODULE):current_state(Pid), - wait_for_state_update(OriginalStateData, - CurrentStateData, - Pid); -wait_for_state_update(_OriginalState, _StateData, - _Pid) -> - ok. + gen_statem:call(Pid, current_state). %% =================================================================== %% Test @@ -1512,7 +1475,7 @@ wait_for_state_update(_OriginalState, _StateData, pool_death_test() -> %% expect error log - error_logger:tty(false), + % error_logger:tty(false), meck:unload(), meck:new(test_vnode, [non_strict, no_link]), meck:expect(test_vnode, @@ -1539,6 +1502,30 @@ pool_death_test() -> exit(Pid, normal), wait_for_process_death(Pid), meck:validate(test_pool_mod), - meck:validate(test_vnode). + meck:validate(test_vnode), + error_logger:tty(true). + +wait_for_process_death(Pid) -> + wait_for_process_death(Pid, is_process_alive(Pid)). + +wait_for_process_death(Pid, true) -> + wait_for_process_death(Pid, is_process_alive(Pid)); +wait_for_process_death(_Pid, false) -> ok. + +wait_for_state_update(OriginalStateData, Pid) -> + {_, CurrentStateData} = (?MODULE):current_state(Pid), + wait_for_state_update(OriginalStateData, + CurrentStateData, + Pid). + +wait_for_state_update(OriginalStateData, + OriginalStateData, Pid) -> + {_, CurrentStateData} = (?MODULE):current_state(Pid), + wait_for_state_update(OriginalStateData, + CurrentStateData, + Pid); +wait_for_state_update(_OriginalState, _StateData, + _Pid) -> + ok. -endif. diff --git a/src/riak_core_vnode_proxy.erl b/src/riak_core_vnode_proxy.erl index 539effd69..230483996 100644 --- a/src/riak_core_vnode_proxy.erl +++ b/src/riak_core_vnode_proxy.erl @@ -300,19 +300,39 @@ handle_overload(Msg, %% STATS %riak_core_stat:update(dropped_vnode_requests), case Msg of - {'$gen_event', + %for gen_fsm + {'$gen_event', %send_event #riak_vnode_req_v1{sender = Sender, request = Request}} -> catch Module:handle_overload_command(Request, Sender, Index); - {'$gen_all_state_event', + {'$gen_all_state_event', %send_all_state_event #riak_vnode_req_v1{sender = Sender, request = Request}} -> catch Module:handle_overload_command(Request, Sender, Index); - {'$gen_event', + {'$gen_event', %send_event + #riak_coverage_req_v1{sender = Sender, + request = Request}} -> + catch Module:handle_overload_command(Request, + Sender, + Index); + %for gen_statem + {'$gen_cast', + #riak_vnode_req_v1{sender = Sender, + request = Request}} -> + catch Module:handle_overload_command(Request, + Sender, + Index); + {'$gen_call', + #riak_vnode_req_v1{sender = Sender, + request = Request}} -> + catch Module:handle_overload_command(Request, + Sender, + Index); + {'$gen_cast', #riak_coverage_req_v1{sender = Sender, request = Request}} -> catch Module:handle_overload_command(Request,