-
Notifications
You must be signed in to change notification settings - Fork 6
Java API
There are two ways to exploit the MUSCLE API: either by using these defined by the Multiscale Modeling Language, or by writing free-form code, where all orchestration and deadlock detection is your own responsibility. The former gives code that is easy to read and to debug, while the latter makes it possible to optimally exploit any parallelism in the model. This page section is split in two parts, one for each approach. With care, it is also possible to combine these approaches in a model, choosing the more appropriate form per submodel.
The computational elements that the Multiscale Modeling Language (MML) defines are:
- the conduit, for sending messages;
- submodel, for modeling phenomena at certain scales;
- mapper, for dimensionless mappings of data;
- filter, for time and datatype filtering; and
- terminal, for terminating empty conduits.
These elements are all supported in the MUSCLE API. Of these elements, only submodels have the explicit concept of scale. In MUSCLE this is mapped to one temporal scale and multiple spatial scales. The temporal scale is used to determine the simulation time at the current iteration, see the MUSCLE configuration file on how to set the scales. Conduits and parameters are used the same in MML-based or free-form code.
The conduit is the mechanism in the MUSCLE runtime environment to send data. In the API, the ConduitEntrance
and ConduitExit
are accessible. The ConduitEntrance
is used to send data, while the ConduitExit
receives data. These entrances and exits, as we call them, can be accessed in two ways. Either they are created in the addPortals()
method, or they are called on the fly in the code using the out()
and in()
methods.
The addPortals()
method is called before the submodel starts, so the exits and entrances are stored as fields of your class. The conduit exit and entrance uses Java Generics to define what kind of data will be received or sent, both in the field declaration and in the addExit or addEntrance method. This allows for compile-time and run-time checking of data that is sent over the conduit.
ConduitExit<double[]> exitA;
ConduitEntrance<double[]> entranceA;
@Override
protected void addPortals() {
exitA = addExit("exitName", double[].class);
entranceA = addEntrance("entranceName", double[].class);
}
To send or receive, use the fields that were initialized in addPortals()
.
double[] dataA = exitA.receive();
entranceA.send(dataA);
If addPortals()
is not overridden it is possible to use the out() and in() methods to send or receive data. It is then not necessary to store the ConduitExit
and ConduitEntrance
as a field.
double[] dataA = (double[]) in("exitName").receive();
out("entranceName").send(dataA);
In this case received data needs to be cast to double[]
to use it, and in the send statement, the Java compiler will give a warning that an unchecked conversion is being done. If the cast is not correct, a ClassCastException
will be thrown by Java. If the exit or entrance name is not configured, an IllegalArgumentException
will be thrown by MUSCLE.
The advantage of the first method is that it is type-safe, so there is no need to cast the data. Also if the code and the configuration file do not match this is detected immediately. The advantage of the second method is that it is less verbose and leads to smaller classes.
A ConduitExit
is used for receiving in a blocking mode. There are two receive methods: receive()
and receiveObservation()
. In the first case only the data is received. In the second, an observation is received, which contains the data but also the timestamp at which the data was sent, and the timestamp at which the next message will arrive. So,
double[] dataA = exitA.receive();
or
Observation<double[]> obsA = exitA.receiveObservation();
double[] dataA = obsA.getData();
Timestamp time = obsA.getTimestamp();
If receive is called and the submodel on the other end has stopped, a MUSCLEConduitExhaustedException is thrown. To prevent this, it is possible to first call exitA.hasNext(), which is also blocking but returns a boolean.
If a lot of conduit exits are used and the order in which they are received is not important, it is possible to loop over multiple conduits and call the ready() method. This is non-blocking, and it will return true when hasNext() will return an answer without blocking.
A !ConduitEntrance has several different send functions. The most basic send(data) sends the data and deduces the timestamp at which the data is sent, by adding delta T of the timescale to the previous timestamp. If the instance is dimensionless, or if required, you can explicitly set the timestamp of the data, and the timestamp at which you send the next message with send(data, time, nextTime):
Timestamp time = Timestamp.ZERO;
Timestamp nextTime = new Timestamp(2.0);
entranceA.send(data, time, nextTime);
Finally, a muscle.core.model.Observation object can also be sent and it contains the same information: data, a time and the next time.
Observation<double[]> obs = new Observation<double[]>(data, time, nextTime);
entranceA.send(obs);
Although the entrance is automatically closed once the instance is finished, it is also possible to close the conduit earlier with the close() method, if it is not going to be used again.
Parameters that are defined in the configuration file are accessed through a range of get*Property()
methods. The most basic form is
String parameter = getProperty("propName");
In this form the property is read as a string. It first tries to find the instance property by searching for "instanceName:propName" and then for a global property named "propName". All get*Property()
methods will throw an IllegalArgumentException
if the property does not exist, this can be prevented by first calling hasProperty()
.
If an instance property is specifically required, this can be checked by first calling
String parameter = null;
if (hasInstanceProperty("propName")) {
parameter = getProperty("propName");
}
If on the other hand only a global property is wanted, write
String parameter = getGlobalProperty("propName");
To get other types of parameters than strings, use getPathProperty()
for a File
, getIntProperty()
for an int
, getDoubleProperty()
for a double
and getBoolProperty()
for a boolean
.
The philosophy of MUSCLE is that I/O should be done by the runtime system, and not directly coded, unless done with Terminals. Therefor, rather than writing to a file or reading from a file directly, we advise to use a FileSource
or FileSink
and attach that to a conduit from which you read. In this way, if you decide that the input should not come from a file but from another submodel, the code in the submodel does not have to change.
Each submodel and terminal has its own temporary directory, which can be accessed with
File tmpPath = getTmpPath();
after which the path can be used to write files, for instance, fileOut.dat
.
FileWriter fw = new FileWriter(new File(tmpPath, "fileOut.dat"));
In this path it is safe to write output in this directory, since this will only be associated to the current submodel in the current run. The path is always printed at the beginning and ending of the execution, and can be specified on the command line with the --tmp-path
flag. Additional to the given temporary path, it will create the directory hostname_date_time_pid
in which the results of the current run are stored. This way, a script can always contain the same temporary path but MUSCLE will use a new unique directory for each run. Whenever a submodel requests to use the temporary directory, a directory is created for that submodel alone, so that different submodels do not interfere with each other.
How to read files depends on what type of execution method is chosen for. Middleware such as QCG can usually stage in files for a single job. If files are staged in this fashion (fileIn.dat
for example), and the --tmp-path
points to the directory which contains the staged files, you can use
FileReader fr = new FileReader(new File(getTmpPath(), "../../fileIn.dat"));
as this will point to the --tmp-path
.
To keep track of what the different submodels are doing a logging facility is available. Simply call
log("some message");
and the message will be printed with timestamp and kernel name, and written to a log file "kernelName.log"
, in the temporary path of the kernel. For more control over the log levels, a Java log Level can be used:
log("something went very wrong; you will see this message even with the -q or --quiet flag.", Level.SEVERE);
log("unless you use the -v or --verbose flag, you will only see this message in the log file.", Level.FINE);
Finally, for full control over the logger and for error handling the Java Logger can also be accessed directly.
try {
doSomethingDangerous();
}
catch (Exception ex) {
getLogger().log(Level.SEVERE, "error message", ex);
}
In MML, submodels are governed by a Submodel Execution Loop. Interpreting this for MUSCLE, this looks like:
while (init_port.has_next()) { // initial condition has a next message
state, timeOrigin = init(t0) // we are allowed to receive messages
while not willStop() {
intermediateObservation() // we are allowed to send messages
state = solvingStep() // we are allowed to receive messages
}
finalObservation() // we are allowed to send messages
}
Reading this code step by step, a submodel can be restarted any number of times depending on the couplings, this is what the outer loop does. Next, each time a submodel is started, it has an initialization phase where it determines the initial state and what the simulation time of this initial state should be. Then it enters a while loop while some end condition is not met. Each iteration it sends some observation of the state, and it computes the next state. When the end condition is met, it is possible to do some cleaning and to send a final observation. At the end of the submodel it decides whether it should restart.
In MUSCLE, a submodel is created by extending muscle.core.kernel.Submodel
. willStop looks at all the messages sent and received and the message with the highest simulation time is compared with the total time in the timescale of the submodel. By default, a submodel only uses it's init_port once, but could depend on whether one of the conduits will receive more messages. A micro model will likely need to restart several times to produce output for a macro model.
The other methods are empty by default so to have a meaningful submodel they should be overridden. For example, if the model depends on some initial geometry which will be calculated by another submodel, the init() method could be implemented as such:
int paramA;
int[] geometry;
@Override
protected Timestamp init(Timestamp prevOrigin) {
paramA = getIntProperty("paramName");
Observation<int[]> initialGeometry = in("geometry").receiveObservation();
geometry = initialGeometry.getData();
return initialGeometry.getTimestamp();
}
If no message is received in the init() function, the best way to return is return super.init(prevOrigin);
. This will take the previous origin and return 0 if it was null, and prevOrigin plus the total time of the timescale if not null. It is not allowed to send messages during the initialization.
After initialization the submodel continues by first calling intermediateObservation() and then solvingStep(). In intermediateObservation the model may send messages, in solvingStep it may only receive messages. Other than that they are regular functions. Although intermediateObservation is not necessarily overridden, solvingStep should be overridden: it should contain the core of the code.
@Override
protected void intermediateObservation() {
double[] dens = calculateDensity(geometry);
out("density").send(dens);
}
@Override
protected void solvingStep() {
int[] changedLocations = (int[]) in("update").receive();
geometry = updateGeometry(changedLocations);
}
Finally, when this loop has iterated as often as the timescale says it should, the method finalObservation is called. Here any clean-up can be performed, and final messages may be sent.
@Override
protected void finalObservation() {
out("finalGeometry").send(geometry);
}
If restartSubmodel is overridden and returns true, then again init will be called, etc. If the state needs to be stored after restarting, this can be done by setting a field of the class.
In MML, a mapper is a computational element that may have multiple in- and outbound ports and may perform any mapping on the data received. In principle it should be stateless, but in MUSCLE this is not enforced. Also, a mapper should first receive on all its ports and then send on all its ports. This is only partially enforced in MUSCLE. There are two specializations of the mapper: the fan-in mapper, which receives on multiple ports but sends on one; and the fan-out mapper which receives on a single port and
A mapper is created in MUSCLE by extending muscle.core.kernel.Mapper
or its subclasses muscle.core.kernel.FanInMapper
and muscle.core.kernel.FanOutMapper
. The mapper has the following loop
init()
while (continueComputation()) {
receiveAll()
sendAll()
}
The default implementation of continueComputation() is to check whether all incoming ports will receive a next message, and returns false only if this is the case. If some of the ports are not mandatory, it is possible to override continueComputation(). In init any parameters may be read or initialization may be performed. In receiveAll() messages are received from the ports and in sendAll() messages are sent. The mapping may be performed in either of the two, whichever is convenient. Since the mapper is scaleless, the implementation should explicitly set the timestamps of the messages.
Observation<int[]> input;
@Override
protected void receiveAll() {
input = in("geometry").receiveObservation();
}
@Override
protected void sendAll() {
double[] geomDouble = convertToDouble(input.getData());
Observation<double[]> geomDoubleObs = input.copyWithData(geomDouble);
out("geometryDouble").send(geomDoubleObs);
out("geometryInt").send(input);
}
In the code above a geometry is received, and one is passed on un-altered and the other converted to int[]
with some function. The convenience method copyWithData is used to have the same timestamps as the original observation, but different data.
In the fan-out mapper, the receiveAll method is already defined, and the result of the single port is saved in the Observation value
field. Conversely, in the fan-in mapper the implementation of receiveAll should store a single observation in the field Observation value
, which the mapper will then send.
A conduit filter is like a mapper, but it is only applied to a single conduit. The implementation is also more light-weight, and theoretically it is allowed to modify the timestamps of messages, or drop them all-together. To implement a filter, extend muscle.core.conduit.filter.AbstractFilter
. This uses generics to indicate what values it should convert between. It is allowed to define a constructor which may take zero arguments or a single double
. The only function that should be overridden is apply() and this should call put() as many times as it wants to send a message. A simple multiplication-filter would look as follows
public class MultiplicationFilter extends muscle.core.conduit.filter.AbstractFilter<double[], double[]> {
double factor;
public MultiplicationFilter(double factor) {
this.factor = factor;
}
@Override
public void apply(Observation<double[]> obs) {
// Do not modify the original data of the submodel, so make a private copy and use the resulting data
// If you supply the datatype in advance (DOUBLE_ARR), the copy operation will be faster.
double[] data = obs.privateCopy(SerializableDatatype.DOUBLE_ARR).getData();
// perform multiplication
for (int i = 0; i < data.length; i++) {
data[i] *= factor;
}
// Create a new observation to send, with the same timestamps as the original
Observation<double[]> multObs = obs.copyWithData(data);
put(multObs);
}
}
In the muscle.core.conduit.filter
are some examples of filters which are ready to use, they are explained in the Configuration section.
A terminal, either a source or a sink, may generate or store values from a conduit. It can be used to perform I/O in a way that is transparent in MUSCLE, so instead of writing to a file, messages can be sent over a conduit and processed by a terminal. It can also be used to test submodels, by studying it in isolation and exactly controlling what messages will be sent or received over the different conduits.
The Source class uses Java Generics to specify what data type T
will be generated. To extend a Source, it is necessary to implement the Observation<T> generate()
and isEmpty()
methods. In take
a message is generated, while isEmpty defines whether the Source will have another message. An example implementation, which would generate five random double arrays, is the following:
public class RandomDoubleSource extends Source<double[]> {
int iteration = 0;
@Override
public Observation<double[]> generate() {
iteration++;
double[] data = new double[10];
Random rand = new Random();
for (int i = 0; i < 10; i++) {
data[i] = rand.nextDouble();
}
return new Observation<double[]>(data, new Timestamp(iteration-1), new Timestamp(iteration));
}
@Override
public boolean isEmpty() {
return iteration >= 5; // stop after 5 iterations
}
}
A Sink only needs to implement the send(Observation<T>)
method. A sink that would write a double array to console could look as follows:
public class PrintDoubleSink extends Sink<double[]> {
@Override
public void send(Observation<double[]> msg) {
System.out.print("Received data at time " + msg.getTimestamp() + ": ");
System.out.println(Arrays.toString(msg.getData());
}
}
With MUSCLE a DoubleFileSink
, NullFileSink
and a DoubleFileSource
are provided in the muscle.core.conduit.terminal
package. Also the abstract classes FileSource
and FileSink
are available to override. See the API documentation for more information.
In the free-form API all kernels extend muscle.core.kernel.Instance. At least the execute() method of Instance should be overridden. In this method any sequence of send and receive is possible, and the conceptual coupling between different submodels can be as small or large as wanted.
One notable difference with the MUSCLE MML API is that it can quit a submodel more directly. If there should still be some synchronization with other submodels, the willStop() method checks when the conduits have sent messages that are larger than the total timescale of the current submodel, or larger than the "max_timesteps" property. An example submodel Sender with name "w" (see src/java/examples/simplejava for the full code), which sends data each iteration would look as follows when using willStop():
import muscle.core.ConduitEntrance;
import muscle.core.kernel.Instance;
/**
a simple java example kernel which sends data
*/
public class Sender extends Instance {
private ConduitEntrance<double[]> entrance;
@Override
protected void addPortals() {
entrance = addEntrance("data", double[].class);
}
@Override
protected void execute() {
double[] dataA = new double[5];
while (!this.willStop()) {
// process data
for(int i = 0; i < dataA.length; i++) {
dataA[i] = i;
}
// send the data
entrance.send(dataA);
}
}
}
By changing the $env['max_timesteps']
and the w['dt']
properties, this submodel will send different number of messages. For example, if max_timesteps
is 4, and dt
is 1 it will send 4 messages.
To enhance performance, there are two alternative types of entrances: shared data entrances and asynchronous entrances. The first can be created with
public void addPortals() {
entrance = addSharedDataEntrance("entranceName", double[].class);
}
When using a shared data entrance, MUSCLE will try to not copy the data that is sent through the conduit. This means that any change that is done by a conduit filter or another submodel that receives the data, also affects the original data. Usually this is not desirable, since it violates the separation that different submodels have. Moreover, code shouldn't depend on the data actually being shared, since it only works if the sending and receiving submodel are run in the same MUSCLE instance, and there is no filter applied that copies the data in some way. A valid use case is when the sending submodel will not use the sent data in any way, since then another submodel may modify it as it pleases.
The second alternative conduit entrance is the asynchronous conduit entrance, which is created with
entrance = addAsynchronousEntrance("entranceName", double[].class);
Whenever a message is sent through this type of entrance the sending submodel does not have to wait until the receiving MUSCLE instance has received the data. This mode of operation slightly increases the latency and involves keeping alive an additional thread. There can be a performance gain if the submodel that sends the data immediately afterwards continues its computations and does not have to wait for input from another submodel. It may also speed up sending multiple messages shortly after each other. The gain will only be noticable if data is sent across multiple MUSCLE instances.
Using the addAsynchronousEntrance call is equivalent to adding the conduit filter "thread" to the beginning of the conduit entrance filter stack, as explained in the configuration section.
So far, the documentation assumed that a fixed timescale will be used during execution. However, in many cases the timescale will depend on how the submodel dynamics go. If this is the case for one of the submodels, and you are using the willStop()
method or extending the Submodel
class, it is possible to override the getScale()
method instead of specifying the scale in the configuration file.
@Override
public Scale getScale() {
// Retrieve the scale that was specified in the configuration file
Scale confScale = super.getScale();
// Determine the current delta t
Distance dt = new Distance(computeDt(confScale));
// Determine how long the model will keep running
Distance omegaT = new Distance(computeOmegaT(confScale));
// Determine the dx/dy
Distance dx = new Distance(computeDx(confScale));
Distance dy = new Distance(computeDy(confScale));
return new Scale(dt, omegaT, dx, dy);
}
Of course, methods computeDt
, etc. above should be implemented according to the submodel requirements. In the Submodel
class this dynamic scale automatically propagates to the conduit entrances. In free-form code however, it is better to either always call send with timestamps or call ConduitEntrance.setDt(Distance)
whenever the timestep changes.