mappy is a re-implementation of the Hadoop MapReduce scheduler written to demonstrate a rules-based coding style and to highlight the benefits of the technique. mappy's job scheduler is equivalent to Hadoop's, and it reimplements the functionality provided by 3 classes in the Hadoop Java implementation: JobImpl, TaskImpl, and TaskAttemptImpl. Each of the 3 classes implements an event-driven state machine and together form the core of Hadoop's job scheduler and its fault handling. Each state machine is defined by specifying what can be visualized as a transition table. The implementation explicitly specifies each transition with a start state, end state, trigger event, and transition action. The implemented "transition table" for each class can be found at the following locations:
Each transition action is itself implemented as a nested class with a member
function transition
which defines the body of the action.
JobImpl.TaskAttemptCompletedEventTransition is
an example of a relatively involved action where as
JobImpl.KillInitedJobTransition,
JobImpl.KilledDuringSetupTransition, and
JobImpl.KilledDuringCommitTransition show 3
transitions that are near identical.
mappy reimplements the functionality provided by the 3 state machines found
in JobImpl.java, TaskImpl.java, and TaskAttemptImpl.java. Each state machine
corresponds to a task (our term for a grouped set of rules and the state
variables they act on) in job.py. Here are the applyRules
methods that implement the rules for each task type:
The rules-based implementation of the MapReduce scheduler is significantly
simpler than the state machine implementation: a total of 19 rules in 3 tasks
provided functionality equivalent to the 163 transitions in the state
implementation. Each of the three applyRules
methods fits in a screen or
two of code (117 total lines of code and comments between the three
applyRules
methods), which makes it possible to view the entire behavior
of each task at once. Furthermore, the order of the rules within each
applyRules
method shows the normal order of processing, which also helps
visualization. In contrast, the state machine implementation required more than
750 lines of code just to specify the three transition tables, plus another
1500 lines of code for the transition handlers.
Hadoop's event-driven state machines use events heavily to communicate between the job scheduler and the outside world, other modules, and sometimes even internally between the components of the job scheduler. For parity and congruence, the mappy implementation uses the same event names for events used to interact with modules outside the 3 classes being reimplemented. Events that Hadoop used to communicate internally between the job scheduler's 3 state machine classes were replaced with equivalent functionality implemented in a more rules-based style.
mappy's equivalence to the Hadoop implementation was verified by hand, and we have also built a mock MapReduce implementation around job.py to run the scheduler as an additional sanity check. The mock implementation provides the following:
- A "worker" that mocks the behavior of a Hadoop container and simply accepts "work" and responds asynchronously after waiting for certain about of time.
- Mock RMContainerAllocator and CommitterEventHandler modules which, like their Hadoop counterparts, handle the events generated by the scheduler.
- A basic RPC system for communication.
- A "master" that runs the scheduler and other modules as a Hadoop MapReduce master would.
mappy can be run by starting a master and any number of workers; they can be all run on the same machine or separately a cluster of machines. The example commands will use a single machine.
By default, modules run in the foreground and print both RPC traffic and certain events to standard out. As such, each module should be run in separate command-line terminals. To kill the process, enter Ctrl-C.
To start a master run the master.py module with the following command which specifies its IP address, PORT number, and number of tasks the Job should have:
./master.py 127.0.0.1 8000 -t 3
To start a worker run the worker.py module with the following command which specifies its IP address and PORT number as well as the master's IP address and PORT number:
./worker.py 127.0.0.1 8001 127.0.0.1 8000
The master will run the scheduler until the Job's goal is reached, all tasks are run and "committed". If any worker dies (is killed using Ctrl-C) while the job is running, the scheduler will reschedule the now lost tasks. Once the job is complete, the master will print the list of tasks and the corresponding worker that completed the task.
Job Complete
<0: SUCCEEDED (u'127.0.0.1', 8001, 3721)>
<1: SUCCEEDED (u'127.0.0.1', 8001, 3721)>
<2: SUCCEEDED (u'127.0.0.1', 8001, 3721)>