Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libooniengine): add support for running tasks #1112

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
30 changes: 0 additions & 30 deletions internal/libooniengine/engine.go

This file was deleted.

95 changes: 94 additions & 1 deletion internal/libooniengine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
/// C API for using the OONI engine.
///

#include <stdint.h>

/// OONITask is an asynchronous thread of execution managed by the OONI
/// engine that performs a background operation and emits meaningful
/// events such as, for example, the results of measurements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description should be updated to mention that we emit interim outputs such as logs and progress as well as the result of the operation.

typedef uintptr_t OONITask;

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -19,7 +26,93 @@ char *OONIEngineVersion(void);
/// OONIEngineFreeMemory frees the memory allocated by the engine.
///
/// @param ptr a void pointer refering to the memory to be freed.
void OONIENgineFreeMemory(void *ptr);
void OONIEngineFreeMemory(void *ptr);

/// OONIEngineCall starts a new OONITask using the given [req].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote the prototype, I used [req] but it seems the right syntax when using Doxygen is @p req. Do you mind updating the documentation in this file?

///
/// @param req A JSON string, owned by the caller, that contains
/// the configuration for the task to start.
///
/// @return Zero on failure, nonzero on success. If the return value
/// is nonzero, a task is running. In such a case, the caller is
/// responsible to eventually dispose of the task using OONIEngineFreeTask.
OONITask OONIEngineCall(char *req);

/// OONIEngineWaitForNextEvent awaits on the [task] event queue until
/// a new event is available or the given [timeout] expires.
///
/// @param task Task handle returned by OONIEngineCall.
///
/// @param timeout Timeout in milliseconds. If the timeout is zero
/// or negative, this function would potentially block forever.
///
/// @return A NULL pointer on failure, non-NULL otherwise. If the return
/// value is non-NULL, the caller takes ownership of the char pointer
/// and MUST free it using OONIEngineFreeMemory when done using it.
///
/// This function will return an empty string:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by this documentation. It strikes me as bad to return NULL or an empty string or a valid string and it would be better for the return value to be either NULL or a valid JSON string. I think we should double check the implementation to be sure it actually does what is documented or whether the documentation was just inconsistent with the actual implementation. (TODO(@bassosimone))

///
/// 1. when the timeout expires;
///
/// 2. if [task] is done;
///
/// 3. if [task] is zero or does not refer to a valid task;
///
/// 4. if we cannot JSON serialize the message;
///
/// 5. possibly because of other unknown internal errors.
///
/// In short, you cannot reliably determine whether a task is done by
/// checking whether this function has returned an empty string.
char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout);

/// OONIEngineTaskGetResult awaits on the result queue until the final
/// result is available or the given [timeout] expires.
///
/// @param task Task handle returned by OONIEngineCall.
///
/// @param timeout Timeout in milliseconds. If the timeout is zero
/// or negative, this function would potentially block forever.
///
/// @return A NULL pointer on failure, non-NULL otherwise. If the return
/// value is non-NULL, the caller takes ownership of the char pointer
/// and MUST free it using OONIEngineFreeMemory when done using it.
///
/// This function will return an empty string:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a timeout for this function call. It should be called after the task is done, and there should be a result once this happens. So, we can simplify the life of the caller by removing support for the timeout.

///
/// 1. when the timeout expires;
///
/// 2. if [task] is zero or does not refer to a valid task;
///
/// 3. if we cannot JSON serialize the message;
///
/// 4. possibly because of other unknown internal errors.
///
/// In short, you cannot reliably determine whether a task is done by
/// checking whether this function has returned an empty string.
char *OONIEngineTaskGetResult(OONITask task, int32_t timeout);

/// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A taks is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A taks is
/// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A task is

/// done when it has finished running _and_ its events queue has been drained.
///
/// @param task Task handle returned by OONIEngineCall.
///
/// @return Nonzero if the task exists and either is still running or has some
/// unread events inside its events queue, zero otherwise.
uint8_t OONIEngineTaskIsDone(OONITask task);

/// OONIEngineInterruptTask tells [task] to stop ASAP.
///
/// @param task Task handle returned by OONIEngineCall. If task is zero
/// or does not refer to a valid task, this function will just do nothing.
void OONIEngineInterruptTask(OONITask task);

/// OONIEngineFreeTask free the memory associated with [task]. If the task is still running, this
/// function will also interrupt it and drain its events queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether we need to drain the event queue, given that the channels should be nonblocking.

///
/// @param task Task handle returned by OONIEngineCall. If task is zero
/// or does not refer to a valid task, this function will just do nothing.
void OONIEngineFreeTask(OONITask task);

#ifdef __cplusplus
}
Expand Down
116 changes: 116 additions & 0 deletions internal/libooniengine/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

//
// C API
//

//#include <stdlib.h>
//
//#include "engine.h"
import "C"

import (
"encoding/json"
"log"
"runtime/cgo"
"time"
"unsafe"

"github.com/ooni/probe-cli/v3/internal/motor"
"github.com/ooni/probe-cli/v3/internal/version"
)

const (
// invalidTaskHandle represents the invalid task handle.
invalidTaskHandle = 0
)

// parse converts a JSON request string to the concrete Go type.
func parse(req *C.char) (*motor.Request, error) {
var out *motor.Request
if err := json.Unmarshal([]byte(C.GoString(req)), out); err != nil {
return nil, err
}
return out, nil
}

// serialize serializes a OONI response to a JSON string accessible to C code.
func serialize(resp *motor.Response) *C.char {
out, err := json.Marshal(resp)
if err != nil {
log.Printf("serializeMessage: cannot serialize message: %s", err.Error())
return C.CString("")
}
return C.CString(string(out))
}

//export OONIEngineVersion
func OONIEngineVersion() *C.char {
return C.CString(version.Version)
}

//export OONIEngineFreeMemory
func OONIEngineFreeMemory(ptr *C.void) {
C.free(unsafe.Pointer(ptr))
}

//export OONIEngineCall
func OONIEngineCall(req *C.char) C.OONITask {
r, err := parse(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think parse should also handle the case where req is nil.

if err != nil {
log.Printf("OONIEngineCall: %s", err.Error())
return invalidTaskHandle
}
taskName, err := motor.ResolveTask(r)
if err != nil {
log.Printf("OONIEngineCall: %s", err.Error())
return invalidTaskHandle
}
tp := motor.StartTask(taskName, r)
if tp == nil {
log.Printf("OONITaskStart: startTask return NULL")
return invalidTaskHandle
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be motor.StartTask(r). That is, I don't think we should have two distinct function calls for ResolveTask and StartTask, but a single function call that does both.

return C.OONITask(cgo.NewHandle(tp))
}

//export OONIEngineWaitForNextEvent
func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char {
tp := cgo.Handle(task).Value().(motor.TaskAPI)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if the task is an invalid handle?

ev := tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should explicitly handle the case where the timeout is zero or negative.

return serialize(ev)
}

//export OONIEngineTaskGetResult
func OONIEngineTaskGetResult(task C.OONITask, timeout C.int32_t) *C.char {
tp := cgo.Handle(task).Value().(motor.TaskAPI)
result := tp.GetResult(time.Duration(timeout) * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to call this GetResult and it could just be Result

return serialize(result)
}

//export OONIEngineTaskIsDone
func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) {
tp := cgo.Handle(task).Value().(motor.TaskAPI)
if !tp.IsDone() {
out++
}
return
}

//export OONIEngineInterruptTask
func OONIEngineInterruptTask(task C.OONITask) {
tp := cgo.Handle(task).Value().(motor.TaskAPI)
tp.Interrupt()
}

//export OONIEngineFreeTask
func OONIEngineFreeTask(task C.OONITask) {
handle := cgo.Handle(task)
tp := handle.Value().(motor.TaskAPI)
handle.Delete()
tp.Free()
}

func main() {
// do nothing
}
21 changes: 21 additions & 0 deletions internal/motor/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package motor

//
// Emitter
//

// taskEmitter implements taskMaybeEmitter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware that you need to change the docstring here

type taskChanEmitter struct {
// out is the channel where we emit events.
out chan *Response
}

var _ taskMaybeEmitter = &taskChanEmitter{}

// maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent.
func (e *taskChanEmitter) maybeEmitEvent(resp *Response) {
select {
case e.out <- resp:
default: // buffer full, discard this event
}
}
11 changes: 11 additions & 0 deletions internal/motor/geolocate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package motor

//
// geolocate handles the geolocate task for a passed session.
//

// geolocateOptions contains the request arguments for the Geolocate task.
type geolocateOptions struct{}

// geolocateResponse is the response for the Geolocate task.
type geolocateResponse struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need geolocate for now?

90 changes: 90 additions & 0 deletions internal/motor/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package motor

import (
"fmt"

"github.com/ooni/probe-cli/v3/internal/model"
)

type LogLevel string

const (
// The DEBUG log level.
logLevel_DEBUG LogLevel = "DEBUG"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can those just be logDebug, logInfo and logWarning?


// The INFO log level.
logLevel_INFO LogLevel = "INFO"

// The WARNING log level.
logLevel_WARNING LogLevel = "WARNING"
)

type logResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this structure should be public

Level LogLevel `json:",omitempty"`
Message string `json:",omitempty"`
}

// taskLogger implements model.Logger for tasks.
type taskLogger struct {
// emitter is used to emit log events.
emitter taskMaybeEmitter

// verbose indicates whether verbose logging is enabled.
verbose bool
}

// newLogger creates a new taskLogger instance using
// the [emitter] to emit log events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the [foo] syntax only works for types and not for parameters. I didn't know when I wrote the original PoC that you used as a starting point.

func newTaskLogger(emitter taskMaybeEmitter, verbose bool) *taskLogger {
return &taskLogger{
emitter: emitter,
verbose: verbose,
}
}

var _ model.Logger = &taskLogger{}

// Debugf implements model.Logger.Debugf.
func (tl *taskLogger) Debugf(format string, values ...any) {
if tl.verbose {
tl.emit(logLevel_DEBUG, fmt.Sprintf(format, values...))
}
}

// Debug implements model.Logger.Debug.
func (tl *taskLogger) Debug(message string) {
if tl.verbose {
tl.emit(logLevel_DEBUG, message)
}
}

// Infof implements model.Logger.Infof.
func (tl *taskLogger) Infof(format string, values ...any) {
tl.emit(logLevel_INFO, fmt.Sprintf(format, values...))
}

// Info implements model.Logger.Info.
func (tl *taskLogger) Info(message string) {
tl.emit(logLevel_INFO, message)
}

// Warnf implements model.Logger.Warnf.
func (tl *taskLogger) Warnf(format string, values ...any) {
tl.emit(logLevel_WARNING, fmt.Sprintf(format, values...))
}

// Warn implements model.Logger.Warn.
func (tl *taskLogger) Warn(message string) {
tl.emit(logLevel_WARNING, message)
}

// emit emits a log message.
func (tl *taskLogger) emit(level LogLevel, message string) {
logResp := &Response{
Logger: logResponse{
Level: level,
Message: message,
},
}
tl.emitter.maybeEmitEvent(logResp)
}
Loading