From ca3f41c3c430f630bca5826fadf58d77691643bb Mon Sep 17 00:00:00 2001 From: decfox Date: Wed, 22 Feb 2023 12:15:09 +0530 Subject: [PATCH 01/12] feat: expose NewSession in the C API --- internal/engine/session.go | 57 +++++++++++ internal/libooniengine/emitter.go | 25 +++++ internal/libooniengine/engine.h | 16 ++++ internal/libooniengine/logger.go | 86 +++++++++++++++++ internal/libooniengine/{engine.go => main.go} | 16 ++++ internal/libooniengine/model.go | 64 +++++++++++++ internal/libooniengine/session.go | 82 ++++++++++++++++ internal/libooniengine/task.go | 94 +++++++++++++++++++ 8 files changed, 440 insertions(+) create mode 100644 internal/libooniengine/emitter.go create mode 100644 internal/libooniengine/logger.go rename internal/libooniengine/{engine.go => main.go} (54%) create mode 100644 internal/libooniengine/model.go create mode 100644 internal/libooniengine/session.go create mode 100644 internal/libooniengine/task.go diff --git a/internal/engine/session.go b/internal/engine/session.go index 3169d711c..c4563711e 100644 --- a/internal/engine/session.go +++ b/internal/engine/session.go @@ -221,6 +221,63 @@ func NewSession(ctx context.Context, config SessionConfig) (*Session, error) { return sess, nil } +func NewSessionWithoutTunnel(ctx context.Context, config *SessionConfig) (*Session, error) { + if config.Logger == nil { + return nil, errors.New("Logger is empty") + } + if config.SoftwareName == "" { + return nil, errors.New("SoftwareName is empty") + } + if config.SoftwareVersion == "" { + return nil, errors.New("SoftwareVersion is empty") + } + if config.KVStore == nil { + config.KVStore = &kvstore.Memory{} + } + // Implementation note: if config.TempDir is empty, then Go will + // use the temporary directory on the current system. This should + // work on Desktop. We tested that it did also work on iOS, but + // we have also seen on 2020-06-10 that it does not work on Android. + tempDir, err := ioutil.TempDir(config.TempDir, "ooniengine") + if err != nil { + return nil, err + } + config.Logger.Infof( + "ooniprobe-engine/v%s %s dirty=%s %s", + version.Version, + runtimex.BuildInfo.VcsRevision, + runtimex.BuildInfo.VcsModified, + runtimex.BuildInfo.GoVersion, + ) + sess := &Session{ + availableProbeServices: config.AvailableProbeServices, + byteCounter: bytecounter.New(), + kvStore: config.KVStore, + logger: config.Logger, + queryProbeServicesCount: &atomic.Int64{}, + softwareName: config.SoftwareName, + softwareVersion: config.SoftwareVersion, + tempDir: tempDir, + torArgs: config.TorArgs, + torBinary: config.TorBinary, + tunnelDir: config.TunnelDir, + } + var proxyURL *url.URL = nil + sess.proxyURL = proxyURL + sess.resolver = &sessionresolver.Resolver{ + ByteCounter: sess.byteCounter, + KVStore: config.KVStore, + Logger: sess.logger, + ProxyURL: proxyURL, + } + txp := netxlite.NewHTTPTransportWithLoggerResolverAndOptionalProxyURL( + sess.logger, sess.resolver, sess.proxyURL, + ) + txp = bytecounter.WrapHTTPTransport(txp, sess.byteCounter) + sess.httpDefaultTransport = txp + return sess, nil +} + // TunnelDir returns the persistent directory used by tunnels. func (s *Session) TunnelDir() string { return s.tunnelDir diff --git a/internal/libooniengine/emitter.go b/internal/libooniengine/emitter.go new file mode 100644 index 000000000..73a75bd24 --- /dev/null +++ b/internal/libooniengine/emitter.go @@ -0,0 +1,25 @@ +package main + +// +// Emitter +// + +// taskEmitter implements taskMaybeEmitter. +type taskChanEmitter struct { + // out is the channel where we emit events. + out chan *goMessage +} + +var _ taskMaybeEmitter = &taskChanEmitter{} + +// maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent. +func (e *taskChanEmitter) maybeEmitEvent(name string, value event) { + ev := &goMessage{ + key: name, + value: value, + } + select { + case e.out <- ev: + default: // buffer full, discard this event + } +} diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 90f1b560e..a22a83207 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -7,6 +7,13 @@ /// C API for using the OONI engine. /// +#include + +/// OONITask is an asynchronous thread of execution managed by the OONI +/// engine that performs a background operation and emits meaningful +/// events such as, for example, the results of measurements. +typedef uintptr_t OONITask; + #ifdef __cplusplus extern "C" { #endif @@ -21,6 +28,15 @@ char *OONIEngineVersion(void); /// @param ptr a void pointer refering to the memory to be freed. void OONIENgineFreeMemory(void *ptr); +/// NewSession creates a new session with the given config. +/// +/// @param config a JSON string representing the configuration for the session. +/// +/// @return Zero on failure, nonzero on success. If the return value +/// is nonzero, a task is running. In such a case, the caller is +/// responsible to eventually dispose of the task using OONIEngineFree. +OONITask NewSession(char *config); + #ifdef __cplusplus } #endif diff --git a/internal/libooniengine/logger.go b/internal/libooniengine/logger.go new file mode 100644 index 000000000..d78929dae --- /dev/null +++ b/internal/libooniengine/logger.go @@ -0,0 +1,86 @@ +package main + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +type LogLevel int32 + +const ( + // The DEBUG log level. + logLevel_DEBUG LogLevel = 0 + // The INFO log level. + logLevel_INFO LogLevel = 1 + // The WARNING log level. + logLevel_WARNING LogLevel = 2 +) + +type LogEvent struct { + Level LogLevel `json:"Level,omitempty"` + Message string `json:"Message,omitempty"` +} + +// taskLogger implements model.Logger for tasks. +type taskLogger struct { + // emitter is used to emit log events. + emitter taskMaybeEmitter + + // verbose indicates whether verbose logging is enabled. + verbose bool +} + +// newLogger creates a new taskLogger instance using +// the [emitter] to emit log events. +func newTaskLogger(emitter taskMaybeEmitter) *taskLogger { + return &taskLogger{ + emitter: emitter, + verbose: false, + } +} + +var _ model.Logger = &taskLogger{} + +// Debugf implements model.Logger.Debugf. +func (tl *taskLogger) Debugf(format string, values ...any) { + if tl.verbose { + tl.emit(logLevel_DEBUG, fmt.Sprintf(format, values...)) + } +} + +// Debug implements model.Logger.Debug. +func (tl *taskLogger) Debug(message string) { + if tl.verbose { + tl.emit(logLevel_DEBUG, message) + } +} + +// Infof implements model.Logger.Infof. +func (tl *taskLogger) Infof(format string, values ...any) { + tl.emit(logLevel_INFO, fmt.Sprintf(format, values...)) +} + +// Info implements model.Logger.Info. +func (tl *taskLogger) Info(message string) { + tl.emit(logLevel_INFO, message) +} + +// Warnf implements model.Logger.Warnf. +func (tl *taskLogger) Warnf(format string, values ...any) { + tl.emit(logLevel_WARNING, fmt.Sprintf(format, values...)) +} + +// Warn implements model.Logger.Warn. +func (tl *taskLogger) Warn(message string) { + tl.emit(logLevel_WARNING, message) +} + +// emit emits a log message. +func (tl *taskLogger) emit(level LogLevel, message string) { + value := &LogEvent{ + Level: level, + Message: message, + } + tl.emitter.maybeEmitEvent("Log", value) +} diff --git a/internal/libooniengine/engine.go b/internal/libooniengine/main.go similarity index 54% rename from internal/libooniengine/engine.go rename to internal/libooniengine/main.go index 3a7fd496b..080f8fa2e 100644 --- a/internal/libooniengine/engine.go +++ b/internal/libooniengine/main.go @@ -10,11 +10,17 @@ package main import "C" import ( + "runtime/cgo" "unsafe" "github.com/ooni/probe-cli/v3/internal/version" ) +const ( + // invalidTaskHandle represents the invalid task handle. + invalidTaskHandle = 0 +) + //export OONIEngineVersion func OONIEngineVersion() *C.char { return C.CString(version.Version) @@ -25,6 +31,16 @@ func OONIEngineFreeMemory(ptr *C.void) { C.free(unsafe.Pointer(ptr)) } +//export NewSession +func NewSession(config *C.char) C.OONITask { + value := []byte(C.GoString(config)) + tp := startTask("NewSession", value) + if tp == nil { + return invalidTaskHandle + } + return C.OONITask(cgo.NewHandle(tp)) +} + func main() { // do nothing } diff --git a/internal/libooniengine/model.go b/internal/libooniengine/model.go new file mode 100644 index 000000000..e3616f49a --- /dev/null +++ b/internal/libooniengine/model.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "time" +) + +// event is any event that occurs. +type event interface{} + +// goMessage is the internal representation of OONIMessage +type goMessage struct { + // key is the event key. + key string + + // value is the value of the event. + value event +} + +// taskEventsBuffer is the buffer used for the task's event chan, which +// should guarantee enough buffering when the application is slow. +const taskEventsBuffer = 1024 + +// taskMaybeEmitter emits events, if possible. We use a buffered +// channel with a large buffer for collecting task events. We expect +// the application to always be able to drain the channel timely. Yet, +// if that's not the case, it is fine to discard events. This data +// type implement such a discard-if-reader is slow behaviour. +type taskMaybeEmitter interface { + // maybeEmitEvent emits an event if there's available buffer in the + // output channel and otherwise discards the event. + maybeEmitEvent(name string, value event) +} + +// taskRunner runs a given task. Any task that you can run from +// the application must implement this interface. +type taskRunner interface { + // Main runs the task to completion. + // + // Arguments: + // + // - ctx is the context for deadline/cancellation/timeout; + // + // - emitter is the emitter to emit events; + // + // - args contains unparsed, task-specific arguments. + main(ctx context.Context, emitter taskMaybeEmitter, args []byte) +} + +// taskAPI implements the OONI engine C API functions. We use this interface +// to enable easier testing of the code that manages the tasks lifecycle. +type taskAPI interface { + // waitForNextEvent implements OONITaskWaitForNextEvent. + waitForNextEvent(timeout time.Duration) *goMessage + + // isDone implements OONITaskIsDone. + isDone() bool + + // interrupt implements OONITaskInterrupt. + interrupt() +} + +// taskRegistry maps each task name to its implementation. +var taskRegistry = map[string]taskRunner{} diff --git a/internal/libooniengine/session.go b/internal/libooniengine/session.go new file mode 100644 index 000000000..9f629bc0d --- /dev/null +++ b/internal/libooniengine/session.go @@ -0,0 +1,82 @@ +package main + +import ( + "context" + "encoding/json" + "net/url" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/model" +) + +type sessConfig struct { + ProxyUrl string `json:"ProxyUrl,omitempty"` + StateDir string `json:"StateDir,omitempty"` + SoftwareName string `json:"SoftwareName,omitempty"` + SoftwareVersion string `json:"SoftwareVersion,omitempty"` + TempDir string `json:"TempDir,omitempty"` + TorArgs []string `json:"TorArgs,omitempty"` + TorBinary string `json:"TorBinary,omitempty"` + TunnelDir string `json:"TunnelDir,omitempty"` +} + +func init() { + taskRegistry["NewSession"] = &newSessionTaskRunner{} +} + +type newSessionTaskRunner struct{} + +var _ taskRunner = &newSessionTaskRunner{} + +func (tr *newSessionTaskRunner) main(ctx context.Context, + emitter taskMaybeEmitter, args []byte) { + logger := newTaskLogger(emitter) + var config *sessConfig + if err := json.Unmarshal(args, config); err != nil { + logger.Warnf("engine: cannot deserialize arguments: %s", err.Error()) + return + } + // TODO(DecFox): we are ignoring the session here but we want to use this for further tasks. + _, err := newSession(ctx, config, logger) + if err != nil { + logger.Warnf("engine: cannot create session: %s", err.Error()) + return + } +} + +// newSession creates a new *engine.Sessioncfg from the given config. +func newSession(ctx context.Context, config *sessConfig, + logger model.Logger) (*engine.Session, error) { + kvs, err := kvstore.NewFS(config.StateDir) + if err != nil { + return nil, err + } + // Note: while we are passing a proxyUrl here, we do not bootstrap any tunnels in + // this function. + proxyURL, err := parseProxyURL(config.ProxyUrl) + if err != nil { + return nil, err + } + cfg := &engine.SessionConfig{ + AvailableProbeServices: []model.OOAPIService{}, + KVStore: kvs, + Logger: logger, + ProxyURL: proxyURL, // nil if cfg.ProxyURL is "" + SoftwareName: config.SoftwareName, + SoftwareVersion: config.SoftwareVersion, + TempDir: config.TempDir, + TorArgs: config.TorArgs, + TorBinary: config.TorBinary, + TunnelDir: config.TunnelDir, + } + return engine.NewSessionWithoutTunnel(ctx, cfg) +} + +// parseProxyURL returns the proper proxy URL or nil if it's not cfgured. +func parseProxyURL(proxyURL string) (*url.URL, error) { + if proxyURL == "" { + return nil, nil + } + return url.Parse(proxyURL) +} diff --git a/internal/libooniengine/task.go b/internal/libooniengine/task.go new file mode 100644 index 000000000..1f01210b3 --- /dev/null +++ b/internal/libooniengine/task.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "log" + "sync/atomic" + "time" +) + +// startTask starts a given task. +func startTask(name string, args []byte) taskAPI { + ctx, cancel := context.WithCancel(context.Background()) + tp := &taskState{ + cancel: cancel, + done: &atomic.Int64{}, + events: make(chan *goMessage, taskEventsBuffer), + stopped: make(chan any), + } + go tp.main(ctx, name, args) + return tp +} + +// task implements taskAPI. +type taskState struct { + // cancel cancels the context used by this task. + cancel context.CancelFunc + + // done indicates that this task is done. + done *atomic.Int64 + + // events is the channel where we emit task events. + events chan *goMessage + + // stopped indicates that the task is done. + stopped chan any +} + +var _ taskAPI = &taskState{} + +// waitForNextEvent implements taskAPI.waitForNextEvent. +func (tp *taskState) waitForNextEvent(timeout time.Duration) *goMessage { + // Implementation note: we don't need to log any of these nil-returning conditions + // as they are not exceptional, rather they're part of normal usage. + ctx, cancel := contextForWaitForNextEvent(timeout) + defer cancel() + select { + case <-ctx.Done(): + return nil // timeout while blocking for reading + case ev := <-tp.events: + return ev // ordinary chan reading + case <-tp.stopped: + select { + case ev := <-tp.events: + return ev // still draining the chan + default: + tp.done.Add(1) // fully drained so we can flip "done" now + return nil + } + } +} + +// contextForWaitForNextEvent returns the suitable context +// for making the waitForNextEvent function time bounded. +func contextForWaitForNextEvent(timeo time.Duration) (context.Context, context.CancelFunc) { + ctx := context.Background() + if timeo < 0 { + return context.WithCancel(ctx) + } + return context.WithTimeout(ctx, timeo) +} + +// isDone implements taskAPI.isDone. +func (tp *taskState) isDone() bool { + return tp.done.Load() > 0 +} + +// interrupt implements taskAPI.interrupt. +func (tp *taskState) interrupt() { + tp.cancel() +} + +// main is the main function of the task. +func (tp *taskState) main(ctx context.Context, name string, args []byte) { + defer close(tp.stopped) // synchronize with caller + runner := taskRegistry[name] + if runner == nil { + log.Printf("OONITaskStart: unknown task name: %s", name) + return + } + emitter := &taskChanEmitter{ + out: tp.events, + } + runner.main(ctx, emitter, args) +} From 7dd9f5daf09f44937194f3d91d98a2e40cbcba0f Mon Sep 17 00:00:00 2001 From: decfox Date: Tue, 7 Mar 2023 21:44:50 +0530 Subject: [PATCH 02/12] feat: introduce geolocate --- internal/engine/session.go | 4 ++ internal/libooniengine/emitter.go | 10 ++-- internal/libooniengine/engine.h | 47 ++++++++++++++++-- internal/libooniengine/geolocate.go | 67 ++++++++++++++++++++++++++ internal/libooniengine/logger.go | 16 +++--- internal/libooniengine/main.go | 55 +++++++++++++++++++-- internal/libooniengine/model.go | 34 ++++++++----- internal/libooniengine/session.go | 75 +++++++++++++++++++++++------ internal/libooniengine/task.go | 35 +++++++++++--- internal/libooniengine/ticker.go | 27 +++++++++++ 10 files changed, 313 insertions(+), 57 deletions(-) create mode 100644 internal/libooniengine/geolocate.go create mode 100644 internal/libooniengine/ticker.go diff --git a/internal/engine/session.go b/internal/engine/session.go index c4563711e..cd98d0fb1 100644 --- a/internal/engine/session.go +++ b/internal/engine/session.go @@ -450,6 +450,10 @@ func (s *Session) MaybeLookupBackends() error { return s.MaybeLookupBackendsContext(context.Background()) } +func (s *Session) Resolver() *sessionresolver.Resolver { + return s.resolver +} + // ErrAlreadyUsingProxy indicates that we cannot create a tunnel with // a specific name because we already configured a proxy. var ErrAlreadyUsingProxy = errors.New( diff --git a/internal/libooniengine/emitter.go b/internal/libooniengine/emitter.go index 73a75bd24..ec3d2fde8 100644 --- a/internal/libooniengine/emitter.go +++ b/internal/libooniengine/emitter.go @@ -7,19 +7,15 @@ package main // taskEmitter implements taskMaybeEmitter. type taskChanEmitter struct { // out is the channel where we emit events. - out chan *goMessage + out chan *response } var _ taskMaybeEmitter = &taskChanEmitter{} // maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent. -func (e *taskChanEmitter) maybeEmitEvent(name string, value event) { - ev := &goMessage{ - key: name, - value: value, - } +func (e *taskChanEmitter) maybeEmitEvent(resp *response) { select { - case e.out <- ev: + case e.out <- resp: default: // buffer full, discard this event } } diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index a22a83207..3a115eeb0 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -26,16 +26,53 @@ char *OONIEngineVersion(void); /// OONIEngineFreeMemory frees the memory allocated by the engine. /// /// @param ptr a void pointer refering to the memory to be freed. -void OONIENgineFreeMemory(void *ptr); +void OONIEngineFreeMemory(void *ptr); -/// NewSession creates a new session with the given config. +/// OONIEngineCall starts a new OONITask using the given [req]. /// -/// @param config a JSON string representing the configuration for the session. +/// @param req A JSON string, owned by the caller, that +/// contains the configuration for the task to start. /// /// @return Zero on failure, nonzero on success. If the return value /// is nonzero, a task is running. In such a case, the caller is -/// responsible to eventually dispose of the task using OONIEngineFree. -OONITask NewSession(char *config); +/// responsible to eventually dispose of the task using OONIEngineFreeMemory. +OONITask OONIEngineCall(char *req); + +/// OONIEngineWaitForNextEvent awaits on the [task] event queue until +/// a new event is available or the given [timeout] expires. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @param timeout Timeout in milliseconds. If the timeout is zero +/// or negative, this function would potentially block forever. +/// +/// @return A NULL pointer on failure, non-NULL otherwise. If the return +/// value is non-NULL, the caller takes ownership of the OONIMessage +/// pointer and MUST free it using OONIMessageFree when done using it. +/// +/// This function will return NULL: +/// +/// 1. when the timeout expires; +/// +/// 2. if [task] is done; +/// +/// 3. if [task] is zero or does not refer to a valid task; +/// +/// 4. if we cannot protobuf serialize the message; +/// +/// 5. possibly because of other unknown internal errors. +/// +/// In short, you cannot reliably determine whether a task is done by +/// checking whether this function has returned NULL. +char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); + +/// OONIEngineInterrupt tells [task] to stop ASAP. +/// +/// @param task Task handle returned by OONIEngineCall. If task is zero +/// or does not refer to a valid task, this function will just do nothing. +void OONIEngineInterrupt(OONITask task); + +// OONITask #ifdef __cplusplus } diff --git a/internal/libooniengine/geolocate.go b/internal/libooniengine/geolocate.go new file mode 100644 index 000000000..c49fe434e --- /dev/null +++ b/internal/libooniengine/geolocate.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + + "github.com/ooni/probe-cli/v3/internal/geolocate" +) + +func init() { + taskRegistry["Geolocate"] = &geolocateTaskRunner{} +} + +// geolocateOptions contains the request arguments for the Geolocate task. +type geolocateOptions struct { + SessionId int64 `json:",omitempty"` +} + +// geolocateResponse is the response for the Geolocate task. +type geolocateResponse struct { + ASN uint `json:",omitempty"` + CountryCode string `json:",omitempty"` + NetworkName string `json:",omitempty"` + ProbeIP string `json:",omitempty"` + ResolverASN uint `json:",omitempty"` + ResolverIP string `json:",omitempty"` + ResolverNetworkName string `json:",omitempty"` + Error string `json:",omitempty"` +} + +type geolocateTaskRunner struct{} + +var _ taskRunner = &geolocateTaskRunner{} + +// main implements taskRunner.main +func (tr *geolocateTaskRunner) main(ctx context.Context, + emitter taskMaybeEmitter, req *request, resp *response) { + logger := newTaskLogger(emitter) + sessionId := req.Geolocate.SessionId + sess := mapSession[sessionId] + if sess == nil { + logger.Warnf("session: %s", errInvalidSessionId.Error()) + resp.Geolocate.Error = errInvalidSessionId.Error() + return + } + gt := geolocate.NewTask(geolocate.Config{ + Logger: sess.Logger(), + Resolver: sess.Resolver(), + UserAgent: sess.UserAgent(), + }) + results, err := gt.Run(ctx) + if err != nil { + logger.Warnf("geolocate: %s", err.Error()) + resp.Geolocate.Error = err.Error() + return + } + resp = &response{ + Geolocate: geolocateResponse{ + ASN: results.ASN, + CountryCode: results.CountryCode, + NetworkName: results.NetworkName, + ProbeIP: results.ProbeIP, + ResolverASN: results.ResolverASN, + ResolverIP: results.ResolverIP, + ResolverNetworkName: results.ResolverNetworkName, + }, + } +} diff --git a/internal/libooniengine/logger.go b/internal/libooniengine/logger.go index d78929dae..fb3a92baf 100644 --- a/internal/libooniengine/logger.go +++ b/internal/libooniengine/logger.go @@ -17,9 +17,9 @@ const ( logLevel_WARNING LogLevel = 2 ) -type LogEvent struct { - Level LogLevel `json:"Level,omitempty"` - Message string `json:"Message,omitempty"` +type logResponse struct { + Level LogLevel `json:",omitempty"` + Message string `json:",omitempty"` } // taskLogger implements model.Logger for tasks. @@ -78,9 +78,11 @@ func (tl *taskLogger) Warn(message string) { // emit emits a log message. func (tl *taskLogger) emit(level LogLevel, message string) { - value := &LogEvent{ - Level: level, - Message: message, + logResp := &response{ + Logger: logResponse{ + Level: level, + Message: message, + }, } - tl.emitter.maybeEmitEvent("Log", value) + tl.emitter.maybeEmitEvent(logResp) } diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 080f8fa2e..6f7da5cd0 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -10,7 +10,10 @@ package main import "C" import ( + "encoding/json" + "log" "runtime/cgo" + "time" "unsafe" "github.com/ooni/probe-cli/v3/internal/version" @@ -21,6 +24,27 @@ const ( invalidTaskHandle = 0 ) +// parse converts a JSON request string to the concrete Go type. +func parse(req *C.char) (*request, error) { + var out *request + err := json.Unmarshal([]byte(C.GoString(req)), out) + if err != nil { + return nil, err + } + return out, nil +} + +// serialize serializes a OONI response to a JSON string accessible to C code. +func serialize(resp *response) *C.char { + out, err := json.Marshal(resp) + if err != nil { + log.Printf("serializeMessage: cannot serialize message: %s", err.Error()) + return C.CString("") + } + + return C.CString(string(out)) +} + //export OONIEngineVersion func OONIEngineVersion() *C.char { return C.CString(version.Version) @@ -31,16 +55,39 @@ func OONIEngineFreeMemory(ptr *C.void) { C.free(unsafe.Pointer(ptr)) } -//export NewSession -func NewSession(config *C.char) C.OONITask { - value := []byte(C.GoString(config)) - tp := startTask("NewSession", value) +//export OONIEngineCall +func OONIEngineCall(req *C.char) C.OONITask { + r, err := parse(req) + if err != nil { + log.Printf("OONIEngineCall: %s", err.Error()) + return invalidTaskHandle + } + taskName, err := resolveTask(r) + if err != nil { + log.Printf("OONIEngineCall: %s", err.Error()) + return invalidTaskHandle + } + tp := startTask(taskName, r) if tp == nil { + log.Printf("OONITaskStart: startTask return NULL") return invalidTaskHandle } return C.OONITask(cgo.NewHandle(tp)) } +//export OONIEngineWaitForNextEvent +func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { + tp := cgo.Handle(task).Value().(taskAPI) + ev := tp.waitForNextEvent(time.Duration(timeout) * time.Millisecond) + return serialize(ev) +} + +//export OONIEngineInterrupt +func OONIEngineInterrupt(task C.OONITask) { + tp := cgo.Handle(task).Value().(taskAPI) + tp.interrupt() +} + func main() { // do nothing } diff --git a/internal/libooniengine/model.go b/internal/libooniengine/model.go index e3616f49a..979f1a072 100644 --- a/internal/libooniengine/model.go +++ b/internal/libooniengine/model.go @@ -5,16 +5,21 @@ import ( "time" ) -// event is any event that occurs. -type event interface{} - -// goMessage is the internal representation of OONIMessage -type goMessage struct { - // key is the event key. - key string +// request is the OONI request containing task-specific arguments. +type request struct { + NewSession newSessionOptions `json:",omitempty"` + Geolocate geolocateOptions `json:",omitempty"` + DeleteSession deleteSessionOptions `json:",omitempty"` +} - // value is the value of the event. - value event +// response is the OONI response to serialize before sending. +type response struct { + NewSession newSessionResponse `json:",omitempty"` + Geolocate geolocateResponse `json:",omitempty"` + Logger logResponse `json:",omitempty"` + Ticker tickerResponse `json:",omitempty"` + DeleteSession deleteSessionResponse `json:",omitempty"` + Error string `json:",omitempty"` } // taskEventsBuffer is the buffer used for the task's event chan, which @@ -29,7 +34,7 @@ const taskEventsBuffer = 1024 type taskMaybeEmitter interface { // maybeEmitEvent emits an event if there's available buffer in the // output channel and otherwise discards the event. - maybeEmitEvent(name string, value event) + maybeEmitEvent(resp *response) } // taskRunner runs a given task. Any task that you can run from @@ -43,15 +48,18 @@ type taskRunner interface { // // - emitter is the emitter to emit events; // - // - args contains unparsed, task-specific arguments. - main(ctx context.Context, emitter taskMaybeEmitter, args []byte) + // - req is the parsed request containing task specific arguments. + // + // - resp is the response to emit after the task is complete. Note that + // this an implicit response and only indicates the final response of the task. + main(ctx context.Context, emitter taskMaybeEmitter, req *request, resp *response) } // taskAPI implements the OONI engine C API functions. We use this interface // to enable easier testing of the code that manages the tasks lifecycle. type taskAPI interface { // waitForNextEvent implements OONITaskWaitForNextEvent. - waitForNextEvent(timeout time.Duration) *goMessage + waitForNextEvent(timeout time.Duration) *response // isDone implements OONITaskIsDone. isDone() bool diff --git a/internal/libooniengine/session.go b/internal/libooniengine/session.go index 9f629bc0d..adb99f2e2 100644 --- a/internal/libooniengine/session.go +++ b/internal/libooniengine/session.go @@ -2,15 +2,29 @@ package main import ( "context" - "encoding/json" + "errors" "net/url" + "sync" "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/model" ) -type sessConfig struct { +var ( + errInvalidSessionId = errors.New("passed Session ID does not exist") + mapSession map[int64]*engine.Session + idx int64 = 1 + mu sync.Mutex +) + +func init() { + taskRegistry["NewSession"] = &newSessionTaskRunner{} + taskRegistry["DeleteSession"] = &deleteSessionTask{} +} + +// newSessionOptions contains the request arguments for the NewSession task. +type newSessionOptions struct { ProxyUrl string `json:"ProxyUrl,omitempty"` StateDir string `json:"StateDir,omitempty"` SoftwareName string `json:"SoftwareName,omitempty"` @@ -21,32 +35,36 @@ type sessConfig struct { TunnelDir string `json:"TunnelDir,omitempty"` } -func init() { - taskRegistry["NewSession"] = &newSessionTaskRunner{} +// newSessionResponse is the response for the NewSession task. +type newSessionResponse struct { + SessionId int64 `json:",omitempty"` + Error string `json:",omitempty"` } type newSessionTaskRunner struct{} var _ taskRunner = &newSessionTaskRunner{} +// main implements taskRunner.main func (tr *newSessionTaskRunner) main(ctx context.Context, - emitter taskMaybeEmitter, args []byte) { + emitter taskMaybeEmitter, req *request, resp *response) { logger := newTaskLogger(emitter) - var config *sessConfig - if err := json.Unmarshal(args, config); err != nil { - logger.Warnf("engine: cannot deserialize arguments: %s", err.Error()) - return - } - // TODO(DecFox): we are ignoring the session here but we want to use this for further tasks. - _, err := newSession(ctx, config, logger) + config := req.NewSession + sess, err := newSession(ctx, &config, logger) if err != nil { + resp.NewSession.Error = err.Error() logger.Warnf("engine: cannot create session: %s", err.Error()) return } + mu.Lock() + defer mu.Unlock() + resp.NewSession.SessionId = idx + mapSession[idx] = sess + idx++ } // newSession creates a new *engine.Sessioncfg from the given config. -func newSession(ctx context.Context, config *sessConfig, +func newSession(ctx context.Context, config *newSessionOptions, logger model.Logger) (*engine.Session, error) { kvs, err := kvstore.NewFS(config.StateDir) if err != nil { @@ -73,10 +91,39 @@ func newSession(ctx context.Context, config *sessConfig, return engine.NewSessionWithoutTunnel(ctx, cfg) } -// parseProxyURL returns the proper proxy URL or nil if it's not cfgured. +// parseProxyURL returns the proper proxy URL or nil if it's not configured. func parseProxyURL(proxyURL string) (*url.URL, error) { if proxyURL == "" { return nil, nil } return url.Parse(proxyURL) } + +// deleteSessionOptions contains the request arguments for the DeleteSession task. +type deleteSessionOptions struct { + SessionId int64 `json:",omitempty"` +} + +// deleteSessionResponse is the response for the DeleteSession task. +type deleteSessionResponse struct { + Error string `json:",omitempty"` +} + +type deleteSessionTask struct{} + +var _ taskRunner = &deleteSessionTask{} + +// main implements taskRunner.main +func (tr *deleteSessionTask) main(ctx context.Context, + emitter taskMaybeEmitter, req *request, resp *response) { + sessionId := req.DeleteSession.SessionId + // TODO(DecFox): add check to ensure we have a valid sessionId + sess := mapSession[sessionId] + if sess == nil { + resp.DeleteSession.Error = errInvalidSessionId.Error() + return + } + mu.Lock() + defer mu.Unlock() + mapSession[sessionId] = nil +} diff --git a/internal/libooniengine/task.go b/internal/libooniengine/task.go index 1f01210b3..466a71c2c 100644 --- a/internal/libooniengine/task.go +++ b/internal/libooniengine/task.go @@ -2,21 +2,39 @@ package main import ( "context" + "errors" "log" + "reflect" "sync/atomic" "time" ) +var ( + errInvalidRequest = errors.New("input request has no valid task name") +) + +// resolveTask resolves the task name to perform from the parsed request. +func resolveTask(req *request) (string, error) { + r := reflect.ValueOf(req) + t := r.Type() + for i := 0; i < r.NumField(); i++ { + if !r.Field(i).IsNil() { + return t.Field(i).Name, nil + } + } + return "", errInvalidRequest +} + // startTask starts a given task. -func startTask(name string, args []byte) taskAPI { +func startTask(name string, req *request) taskAPI { ctx, cancel := context.WithCancel(context.Background()) tp := &taskState{ cancel: cancel, done: &atomic.Int64{}, - events: make(chan *goMessage, taskEventsBuffer), + events: make(chan *response, taskEventsBuffer), stopped: make(chan any), } - go tp.main(ctx, name, args) + go tp.main(ctx, name, req) return tp } @@ -29,7 +47,7 @@ type taskState struct { done *atomic.Int64 // events is the channel where we emit task events. - events chan *goMessage + events chan *response // stopped indicates that the task is done. stopped chan any @@ -38,7 +56,7 @@ type taskState struct { var _ taskAPI = &taskState{} // waitForNextEvent implements taskAPI.waitForNextEvent. -func (tp *taskState) waitForNextEvent(timeout time.Duration) *goMessage { +func (tp *taskState) waitForNextEvent(timeout time.Duration) *response { // Implementation note: we don't need to log any of these nil-returning conditions // as they are not exceptional, rather they're part of normal usage. ctx, cancel := contextForWaitForNextEvent(timeout) @@ -80,8 +98,9 @@ func (tp *taskState) interrupt() { } // main is the main function of the task. -func (tp *taskState) main(ctx context.Context, name string, args []byte) { +func (tp *taskState) main(ctx context.Context, name string, req *request) { defer close(tp.stopped) // synchronize with caller + var resp *response runner := taskRegistry[name] if runner == nil { log.Printf("OONITaskStart: unknown task name: %s", name) @@ -90,5 +109,7 @@ func (tp *taskState) main(ctx context.Context, name string, args []byte) { emitter := &taskChanEmitter{ out: tp.events, } - runner.main(ctx, emitter, args) + defer emitter.maybeEmitEvent(resp) + go runTicker(ctx, tp.stopped, emitter, req, time.Now()) + runner.main(ctx, emitter, req, resp) } diff --git a/internal/libooniengine/ticker.go b/internal/libooniengine/ticker.go new file mode 100644 index 000000000..10f99ed6d --- /dev/null +++ b/internal/libooniengine/ticker.go @@ -0,0 +1,27 @@ +package main + +import ( + "context" + "time" +) + +type tickerResponse struct { + ElapsedTime float64 `json:",omitempty"` +} + +// runTicker emits a ticker event every second. It is a subtask +// associated with all tasks. +func runTicker(ctx context.Context, close chan any, + emitter taskMaybeEmitter, req *request, start time.Time) { + var resp *response + ticker := time.NewTicker(time.Second) + for { + select { + case <-close: + return + case <-ticker.C: + resp.Ticker.ElapsedTime = time.Since(start).Seconds() + emitter.maybeEmitEvent(resp) + } + } +} From 468b3132bb2d242261309634059dc8b2f0afe1f7 Mon Sep 17 00:00:00 2001 From: decfox Date: Mon, 20 Mar 2023 09:31:39 +0530 Subject: [PATCH 03/12] refactor: dry out code to only include running tasks --- internal/engine/session.go | 61 -------------- internal/libooniengine/engine.h | 34 +++++--- internal/libooniengine/geolocate.go | 64 +------------- internal/libooniengine/logger.go | 12 +-- internal/libooniengine/main.go | 21 ++++- internal/libooniengine/model.go | 4 +- internal/libooniengine/session.go | 124 +--------------------------- internal/libooniengine/task.go | 10 ++- internal/libooniengine/ticker.go | 27 ------ 9 files changed, 66 insertions(+), 291 deletions(-) delete mode 100644 internal/libooniengine/ticker.go diff --git a/internal/engine/session.go b/internal/engine/session.go index cd98d0fb1..3169d711c 100644 --- a/internal/engine/session.go +++ b/internal/engine/session.go @@ -221,63 +221,6 @@ func NewSession(ctx context.Context, config SessionConfig) (*Session, error) { return sess, nil } -func NewSessionWithoutTunnel(ctx context.Context, config *SessionConfig) (*Session, error) { - if config.Logger == nil { - return nil, errors.New("Logger is empty") - } - if config.SoftwareName == "" { - return nil, errors.New("SoftwareName is empty") - } - if config.SoftwareVersion == "" { - return nil, errors.New("SoftwareVersion is empty") - } - if config.KVStore == nil { - config.KVStore = &kvstore.Memory{} - } - // Implementation note: if config.TempDir is empty, then Go will - // use the temporary directory on the current system. This should - // work on Desktop. We tested that it did also work on iOS, but - // we have also seen on 2020-06-10 that it does not work on Android. - tempDir, err := ioutil.TempDir(config.TempDir, "ooniengine") - if err != nil { - return nil, err - } - config.Logger.Infof( - "ooniprobe-engine/v%s %s dirty=%s %s", - version.Version, - runtimex.BuildInfo.VcsRevision, - runtimex.BuildInfo.VcsModified, - runtimex.BuildInfo.GoVersion, - ) - sess := &Session{ - availableProbeServices: config.AvailableProbeServices, - byteCounter: bytecounter.New(), - kvStore: config.KVStore, - logger: config.Logger, - queryProbeServicesCount: &atomic.Int64{}, - softwareName: config.SoftwareName, - softwareVersion: config.SoftwareVersion, - tempDir: tempDir, - torArgs: config.TorArgs, - torBinary: config.TorBinary, - tunnelDir: config.TunnelDir, - } - var proxyURL *url.URL = nil - sess.proxyURL = proxyURL - sess.resolver = &sessionresolver.Resolver{ - ByteCounter: sess.byteCounter, - KVStore: config.KVStore, - Logger: sess.logger, - ProxyURL: proxyURL, - } - txp := netxlite.NewHTTPTransportWithLoggerResolverAndOptionalProxyURL( - sess.logger, sess.resolver, sess.proxyURL, - ) - txp = bytecounter.WrapHTTPTransport(txp, sess.byteCounter) - sess.httpDefaultTransport = txp - return sess, nil -} - // TunnelDir returns the persistent directory used by tunnels. func (s *Session) TunnelDir() string { return s.tunnelDir @@ -450,10 +393,6 @@ func (s *Session) MaybeLookupBackends() error { return s.MaybeLookupBackendsContext(context.Background()) } -func (s *Session) Resolver() *sessionresolver.Resolver { - return s.resolver -} - // ErrAlreadyUsingProxy indicates that we cannot create a tunnel with // a specific name because we already configured a proxy. var ErrAlreadyUsingProxy = errors.New( diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 3a115eeb0..0b0866c79 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -30,12 +30,12 @@ void OONIEngineFreeMemory(void *ptr); /// OONIEngineCall starts a new OONITask using the given [req]. /// -/// @param req A JSON string, owned by the caller, that -/// contains the configuration for the task to start. +/// @param req A JSON string, owned by the caller, that contains +/// the configuration for the task to start. /// /// @return Zero on failure, nonzero on success. If the return value /// is nonzero, a task is running. In such a case, the caller is -/// responsible to eventually dispose of the task using OONIEngineFreeMemory. +/// responsible to eventually dispose of the task using OONIEngineFreeTask. OONITask OONIEngineCall(char *req); /// OONIEngineWaitForNextEvent awaits on the [task] event queue until @@ -47,10 +47,10 @@ OONITask OONIEngineCall(char *req); /// or negative, this function would potentially block forever. /// /// @return A NULL pointer on failure, non-NULL otherwise. If the return -/// value is non-NULL, the caller takes ownership of the OONIMessage -/// pointer and MUST free it using OONIMessageFree when done using it. +/// value is non-NULL, the caller takes ownership of the char pointer +/// and MUST free it using OONIEngineFreeMemory when done using it. /// -/// This function will return NULL: +/// This function will return an empty string: /// /// 1. when the timeout expires; /// @@ -58,21 +58,35 @@ OONITask OONIEngineCall(char *req); /// /// 3. if [task] is zero or does not refer to a valid task; /// -/// 4. if we cannot protobuf serialize the message; +/// 4. if we cannot JSON serialize the message; /// /// 5. possibly because of other unknown internal errors. /// /// In short, you cannot reliably determine whether a task is done by -/// checking whether this function has returned NULL. +/// checking whether this function has returned an empty string. char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); +/// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A taks is +/// done when it has finished running _and_ its events queue has been drained. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @return Nonzero if the task exists and either is still running or has some +/// unread events inside its events queue, zero otherwise. +uint8_t OONIEngineTaskIsDone(OONITask task); + /// OONIEngineInterrupt tells [task] to stop ASAP. /// /// @param task Task handle returned by OONIEngineCall. If task is zero /// or does not refer to a valid task, this function will just do nothing. -void OONIEngineInterrupt(OONITask task); +void OONIEngineInterruptTask(OONITask task); -// OONITask +/// OONIEngineFreeTask free the memory associated with [task]. If the task is still running, this +/// function will also interrupt it and drain its events queue. +/// +/// @param task Task handle returned by OONIEngineCall. If task is zero +/// or does not refer to a valid task, this function will just do nothing. +void OONIEngineFreeTask(OONITask task); #ifdef __cplusplus } diff --git a/internal/libooniengine/geolocate.go b/internal/libooniengine/geolocate.go index c49fe434e..66bf7bbd6 100644 --- a/internal/libooniengine/geolocate.go +++ b/internal/libooniengine/geolocate.go @@ -1,67 +1,7 @@ package main -import ( - "context" - - "github.com/ooni/probe-cli/v3/internal/geolocate" -) - -func init() { - taskRegistry["Geolocate"] = &geolocateTaskRunner{} -} - // geolocateOptions contains the request arguments for the Geolocate task. -type geolocateOptions struct { - SessionId int64 `json:",omitempty"` -} +type geolocateOptions struct{} // geolocateResponse is the response for the Geolocate task. -type geolocateResponse struct { - ASN uint `json:",omitempty"` - CountryCode string `json:",omitempty"` - NetworkName string `json:",omitempty"` - ProbeIP string `json:",omitempty"` - ResolverASN uint `json:",omitempty"` - ResolverIP string `json:",omitempty"` - ResolverNetworkName string `json:",omitempty"` - Error string `json:",omitempty"` -} - -type geolocateTaskRunner struct{} - -var _ taskRunner = &geolocateTaskRunner{} - -// main implements taskRunner.main -func (tr *geolocateTaskRunner) main(ctx context.Context, - emitter taskMaybeEmitter, req *request, resp *response) { - logger := newTaskLogger(emitter) - sessionId := req.Geolocate.SessionId - sess := mapSession[sessionId] - if sess == nil { - logger.Warnf("session: %s", errInvalidSessionId.Error()) - resp.Geolocate.Error = errInvalidSessionId.Error() - return - } - gt := geolocate.NewTask(geolocate.Config{ - Logger: sess.Logger(), - Resolver: sess.Resolver(), - UserAgent: sess.UserAgent(), - }) - results, err := gt.Run(ctx) - if err != nil { - logger.Warnf("geolocate: %s", err.Error()) - resp.Geolocate.Error = err.Error() - return - } - resp = &response{ - Geolocate: geolocateResponse{ - ASN: results.ASN, - CountryCode: results.CountryCode, - NetworkName: results.NetworkName, - ProbeIP: results.ProbeIP, - ResolverASN: results.ResolverASN, - ResolverIP: results.ResolverIP, - ResolverNetworkName: results.ResolverNetworkName, - }, - } -} +type geolocateResponse struct{} diff --git a/internal/libooniengine/logger.go b/internal/libooniengine/logger.go index fb3a92baf..7e00f6ca6 100644 --- a/internal/libooniengine/logger.go +++ b/internal/libooniengine/logger.go @@ -6,15 +6,15 @@ import ( "github.com/ooni/probe-cli/v3/internal/model" ) -type LogLevel int32 +type LogLevel string const ( // The DEBUG log level. - logLevel_DEBUG LogLevel = 0 + logLevel_DEBUG LogLevel = "DEBUG" // The INFO log level. - logLevel_INFO LogLevel = 1 + logLevel_INFO LogLevel = "INFO" // The WARNING log level. - logLevel_WARNING LogLevel = 2 + logLevel_WARNING LogLevel = "WARNING" ) type logResponse struct { @@ -33,10 +33,10 @@ type taskLogger struct { // newLogger creates a new taskLogger instance using // the [emitter] to emit log events. -func newTaskLogger(emitter taskMaybeEmitter) *taskLogger { +func newTaskLogger(emitter taskMaybeEmitter, verbose bool) *taskLogger { return &taskLogger{ emitter: emitter, - verbose: false, + verbose: verbose, } } diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 6f7da5cd0..4c179e5fe 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -27,8 +27,7 @@ const ( // parse converts a JSON request string to the concrete Go type. func parse(req *C.char) (*request, error) { var out *request - err := json.Unmarshal([]byte(C.GoString(req)), out) - if err != nil { + if err := json.Unmarshal([]byte(C.GoString(req)), out); err != nil { return nil, err } return out, nil @@ -41,7 +40,6 @@ func serialize(resp *response) *C.char { log.Printf("serializeMessage: cannot serialize message: %s", err.Error()) return C.CString("") } - return C.CString(string(out)) } @@ -82,12 +80,29 @@ func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { return serialize(ev) } +//export OONIEngineTaskIsDone +func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { + tp := cgo.Handle(task).Value().(taskAPI) + if !tp.isDone() { + out++ + } + return +} + //export OONIEngineInterrupt func OONIEngineInterrupt(task C.OONITask) { tp := cgo.Handle(task).Value().(taskAPI) tp.interrupt() } +//export OONIEngineFreeTask +func OONIEngineFreeTask(task C.OONITask) { + handle := cgo.Handle(task) + tp := handle.Value().(taskAPI) + handle.Delete() + tp.free() +} + func main() { // do nothing } diff --git a/internal/libooniengine/model.go b/internal/libooniengine/model.go index 979f1a072..bfacedbe5 100644 --- a/internal/libooniengine/model.go +++ b/internal/libooniengine/model.go @@ -17,7 +17,6 @@ type response struct { NewSession newSessionResponse `json:",omitempty"` Geolocate geolocateResponse `json:",omitempty"` Logger logResponse `json:",omitempty"` - Ticker tickerResponse `json:",omitempty"` DeleteSession deleteSessionResponse `json:",omitempty"` Error string `json:",omitempty"` } @@ -66,6 +65,9 @@ type taskAPI interface { // interrupt implements OONITaskInterrupt. interrupt() + + // free implements OONITaskFree + free() } // taskRegistry maps each task name to its implementation. diff --git a/internal/libooniengine/session.go b/internal/libooniengine/session.go index adb99f2e2..b9cd9b2b4 100644 --- a/internal/libooniengine/session.go +++ b/internal/libooniengine/session.go @@ -1,129 +1,13 @@ package main -import ( - "context" - "errors" - "net/url" - "sync" - - "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/kvstore" - "github.com/ooni/probe-cli/v3/internal/model" -) - -var ( - errInvalidSessionId = errors.New("passed Session ID does not exist") - mapSession map[int64]*engine.Session - idx int64 = 1 - mu sync.Mutex -) - -func init() { - taskRegistry["NewSession"] = &newSessionTaskRunner{} - taskRegistry["DeleteSession"] = &deleteSessionTask{} -} - // newSessionOptions contains the request arguments for the NewSession task. -type newSessionOptions struct { - ProxyUrl string `json:"ProxyUrl,omitempty"` - StateDir string `json:"StateDir,omitempty"` - SoftwareName string `json:"SoftwareName,omitempty"` - SoftwareVersion string `json:"SoftwareVersion,omitempty"` - TempDir string `json:"TempDir,omitempty"` - TorArgs []string `json:"TorArgs,omitempty"` - TorBinary string `json:"TorBinary,omitempty"` - TunnelDir string `json:"TunnelDir,omitempty"` -} +type newSessionOptions struct{} // newSessionResponse is the response for the NewSession task. -type newSessionResponse struct { - SessionId int64 `json:",omitempty"` - Error string `json:",omitempty"` -} - -type newSessionTaskRunner struct{} - -var _ taskRunner = &newSessionTaskRunner{} - -// main implements taskRunner.main -func (tr *newSessionTaskRunner) main(ctx context.Context, - emitter taskMaybeEmitter, req *request, resp *response) { - logger := newTaskLogger(emitter) - config := req.NewSession - sess, err := newSession(ctx, &config, logger) - if err != nil { - resp.NewSession.Error = err.Error() - logger.Warnf("engine: cannot create session: %s", err.Error()) - return - } - mu.Lock() - defer mu.Unlock() - resp.NewSession.SessionId = idx - mapSession[idx] = sess - idx++ -} - -// newSession creates a new *engine.Sessioncfg from the given config. -func newSession(ctx context.Context, config *newSessionOptions, - logger model.Logger) (*engine.Session, error) { - kvs, err := kvstore.NewFS(config.StateDir) - if err != nil { - return nil, err - } - // Note: while we are passing a proxyUrl here, we do not bootstrap any tunnels in - // this function. - proxyURL, err := parseProxyURL(config.ProxyUrl) - if err != nil { - return nil, err - } - cfg := &engine.SessionConfig{ - AvailableProbeServices: []model.OOAPIService{}, - KVStore: kvs, - Logger: logger, - ProxyURL: proxyURL, // nil if cfg.ProxyURL is "" - SoftwareName: config.SoftwareName, - SoftwareVersion: config.SoftwareVersion, - TempDir: config.TempDir, - TorArgs: config.TorArgs, - TorBinary: config.TorBinary, - TunnelDir: config.TunnelDir, - } - return engine.NewSessionWithoutTunnel(ctx, cfg) -} - -// parseProxyURL returns the proper proxy URL or nil if it's not configured. -func parseProxyURL(proxyURL string) (*url.URL, error) { - if proxyURL == "" { - return nil, nil - } - return url.Parse(proxyURL) -} +type newSessionResponse struct{} // deleteSessionOptions contains the request arguments for the DeleteSession task. -type deleteSessionOptions struct { - SessionId int64 `json:",omitempty"` -} +type deleteSessionOptions struct{} // deleteSessionResponse is the response for the DeleteSession task. -type deleteSessionResponse struct { - Error string `json:",omitempty"` -} - -type deleteSessionTask struct{} - -var _ taskRunner = &deleteSessionTask{} - -// main implements taskRunner.main -func (tr *deleteSessionTask) main(ctx context.Context, - emitter taskMaybeEmitter, req *request, resp *response) { - sessionId := req.DeleteSession.SessionId - // TODO(DecFox): add check to ensure we have a valid sessionId - sess := mapSession[sessionId] - if sess == nil { - resp.DeleteSession.Error = errInvalidSessionId.Error() - return - } - mu.Lock() - defer mu.Unlock() - mapSession[sessionId] = nil -} +type deleteSessionResponse struct{} diff --git a/internal/libooniengine/task.go b/internal/libooniengine/task.go index 466a71c2c..52d856d6e 100644 --- a/internal/libooniengine/task.go +++ b/internal/libooniengine/task.go @@ -97,6 +97,15 @@ func (tp *taskState) interrupt() { tp.cancel() } +// free implements taskAPI.free +func (tp *taskState) free() { + tp.interrupt() + for !tp.isDone() { + const blockForever = -1 + _ = tp.waitForNextEvent(blockForever) + } +} + // main is the main function of the task. func (tp *taskState) main(ctx context.Context, name string, req *request) { defer close(tp.stopped) // synchronize with caller @@ -110,6 +119,5 @@ func (tp *taskState) main(ctx context.Context, name string, req *request) { out: tp.events, } defer emitter.maybeEmitEvent(resp) - go runTicker(ctx, tp.stopped, emitter, req, time.Now()) runner.main(ctx, emitter, req, resp) } diff --git a/internal/libooniengine/ticker.go b/internal/libooniengine/ticker.go deleted file mode 100644 index 10f99ed6d..000000000 --- a/internal/libooniengine/ticker.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "context" - "time" -) - -type tickerResponse struct { - ElapsedTime float64 `json:",omitempty"` -} - -// runTicker emits a ticker event every second. It is a subtask -// associated with all tasks. -func runTicker(ctx context.Context, close chan any, - emitter taskMaybeEmitter, req *request, start time.Time) { - var resp *response - ticker := time.NewTicker(time.Second) - for { - select { - case <-close: - return - case <-ticker.C: - resp.Ticker.ElapsedTime = time.Since(start).Seconds() - emitter.maybeEmitEvent(resp) - } - } -} From 95469ed79fd485e084a3d208147374db4c422caa Mon Sep 17 00:00:00 2001 From: decfox Date: Mon, 20 Mar 2023 09:34:34 +0530 Subject: [PATCH 04/12] changes from code review --- internal/libooniengine/logger.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/libooniengine/logger.go b/internal/libooniengine/logger.go index 7e00f6ca6..960eece4f 100644 --- a/internal/libooniengine/logger.go +++ b/internal/libooniengine/logger.go @@ -11,8 +11,10 @@ type LogLevel string const ( // The DEBUG log level. logLevel_DEBUG LogLevel = "DEBUG" + // The INFO log level. logLevel_INFO LogLevel = "INFO" + // The WARNING log level. logLevel_WARNING LogLevel = "WARNING" ) From 542c4e3274a182fa0d93acd754135cd84d5a1776 Mon Sep 17 00:00:00 2001 From: decfox Date: Tue, 21 Mar 2023 08:46:57 +0530 Subject: [PATCH 05/12] feat: add mock task for integration testing --- internal/libooniengine/model.go | 2 ++ internal/libooniengine/test.go | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 internal/libooniengine/test.go diff --git a/internal/libooniengine/model.go b/internal/libooniengine/model.go index bfacedbe5..34750fc60 100644 --- a/internal/libooniengine/model.go +++ b/internal/libooniengine/model.go @@ -10,6 +10,7 @@ type request struct { NewSession newSessionOptions `json:",omitempty"` Geolocate geolocateOptions `json:",omitempty"` DeleteSession deleteSessionOptions `json:",omitempty"` + Test testOptions `json:",omitempty"` } // response is the OONI response to serialize before sending. @@ -18,6 +19,7 @@ type response struct { Geolocate geolocateResponse `json:",omitempty"` Logger logResponse `json:",omitempty"` DeleteSession deleteSessionResponse `json:",omitempty"` + Test testResponse `json:",omitempty"` Error string `json:",omitempty"` } diff --git a/internal/libooniengine/test.go b/internal/libooniengine/test.go new file mode 100644 index 000000000..4259337b8 --- /dev/null +++ b/internal/libooniengine/test.go @@ -0,0 +1,46 @@ +package main + +// +// test is a mock task to mimic the request-response API for the FFI consumer. +// + +import ( + "context" + "errors" +) + +func init() { + taskRegistry["Test"] = &testTaskRunner{} +} + +var ( + errTestDisabled = errors.New("request argument for test disabled") +) + +// testOptions contains the request options for the Test task. +type testOptions struct { + Test bool `json:",omitempty"` +} + +// testResponse is the response for the Test task. +type testResponse struct { + Response string `json:",omitempty"` + Error string `json:"omitempty"` +} + +type testTaskRunner struct{} + +var _ taskRunner = &testTaskRunner{} + +// main implements taskRunner.main +func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, + req *request, res *response) { + logger := newTaskLogger(emitter, false) + if !req.Test.Test { + logger.Warnf("task_runner: %s", errTestDisabled.Error()) + res.Test.Error = errTestDisabled.Error() + return + } + logger.Info("task_runner: a log event for the Test task") + res.Test.Response = "test success" +} From 719109a60a5016094c8919d67a5eec728cc0fd18 Mon Sep 17 00:00:00 2001 From: decfox Date: Sat, 25 Mar 2023 10:52:13 +0530 Subject: [PATCH 06/12] feat: separate Go and C code --- internal/libooniengine/main.go | 27 ++++++++------- internal/{libooniengine => motor}/emitter.go | 6 ++-- .../{libooniengine => motor}/geolocate.go | 6 +++- internal/{libooniengine => motor}/logger.go | 4 +-- internal/{libooniengine => motor}/model.go | 20 +++++------ internal/{libooniengine => motor}/session.go | 6 +++- internal/{libooniengine => motor}/task.go | 34 +++++++++---------- internal/{libooniengine => motor}/test.go | 4 +-- 8 files changed, 58 insertions(+), 49 deletions(-) rename internal/{libooniengine => motor}/emitter.go (77%) rename internal/{libooniengine => motor}/geolocate.go (70%) rename internal/{libooniengine => motor}/logger.go (98%) rename internal/{libooniengine => motor}/model.go (89%) rename internal/{libooniengine => motor}/session.go (82%) rename internal/{libooniengine => motor}/task.go (78%) rename internal/{libooniengine => motor}/test.go (95%) diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 4c179e5fe..85166be13 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -16,6 +16,7 @@ import ( "time" "unsafe" + "github.com/ooni/probe-cli/v3/internal/motor" "github.com/ooni/probe-cli/v3/internal/version" ) @@ -25,8 +26,8 @@ const ( ) // parse converts a JSON request string to the concrete Go type. -func parse(req *C.char) (*request, error) { - var out *request +func parse(req *C.char) (*motor.Request, error) { + var out *motor.Request if err := json.Unmarshal([]byte(C.GoString(req)), out); err != nil { return nil, err } @@ -34,7 +35,7 @@ func parse(req *C.char) (*request, error) { } // serialize serializes a OONI response to a JSON string accessible to C code. -func serialize(resp *response) *C.char { +func serialize(resp *motor.Response) *C.char { out, err := json.Marshal(resp) if err != nil { log.Printf("serializeMessage: cannot serialize message: %s", err.Error()) @@ -60,12 +61,12 @@ func OONIEngineCall(req *C.char) C.OONITask { log.Printf("OONIEngineCall: %s", err.Error()) return invalidTaskHandle } - taskName, err := resolveTask(r) + taskName, err := motor.ResolveTask(r) if err != nil { log.Printf("OONIEngineCall: %s", err.Error()) return invalidTaskHandle } - tp := startTask(taskName, r) + tp := motor.StartTask(taskName, r) if tp == nil { log.Printf("OONITaskStart: startTask return NULL") return invalidTaskHandle @@ -75,15 +76,15 @@ func OONIEngineCall(req *C.char) C.OONITask { //export OONIEngineWaitForNextEvent func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { - tp := cgo.Handle(task).Value().(taskAPI) - ev := tp.waitForNextEvent(time.Duration(timeout) * time.Millisecond) + tp := cgo.Handle(task).Value().(motor.TaskAPI) + ev := tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond) return serialize(ev) } //export OONIEngineTaskIsDone func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { - tp := cgo.Handle(task).Value().(taskAPI) - if !tp.isDone() { + tp := cgo.Handle(task).Value().(motor.TaskAPI) + if !tp.IsDone() { out++ } return @@ -91,16 +92,16 @@ func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { //export OONIEngineInterrupt func OONIEngineInterrupt(task C.OONITask) { - tp := cgo.Handle(task).Value().(taskAPI) - tp.interrupt() + tp := cgo.Handle(task).Value().(motor.TaskAPI) + tp.Interrupt() } //export OONIEngineFreeTask func OONIEngineFreeTask(task C.OONITask) { handle := cgo.Handle(task) - tp := handle.Value().(taskAPI) + tp := handle.Value().(motor.TaskAPI) handle.Delete() - tp.free() + tp.Free() } func main() { diff --git a/internal/libooniengine/emitter.go b/internal/motor/emitter.go similarity index 77% rename from internal/libooniengine/emitter.go rename to internal/motor/emitter.go index ec3d2fde8..213c14c75 100644 --- a/internal/libooniengine/emitter.go +++ b/internal/motor/emitter.go @@ -1,4 +1,4 @@ -package main +package motor // // Emitter @@ -7,13 +7,13 @@ package main // taskEmitter implements taskMaybeEmitter. type taskChanEmitter struct { // out is the channel where we emit events. - out chan *response + out chan *Response } var _ taskMaybeEmitter = &taskChanEmitter{} // maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent. -func (e *taskChanEmitter) maybeEmitEvent(resp *response) { +func (e *taskChanEmitter) maybeEmitEvent(resp *Response) { select { case e.out <- resp: default: // buffer full, discard this event diff --git a/internal/libooniengine/geolocate.go b/internal/motor/geolocate.go similarity index 70% rename from internal/libooniengine/geolocate.go rename to internal/motor/geolocate.go index 66bf7bbd6..d0188c0b7 100644 --- a/internal/libooniengine/geolocate.go +++ b/internal/motor/geolocate.go @@ -1,4 +1,8 @@ -package main +package motor + +// +// geolocate handles the geolocate task for a passed session. +// // geolocateOptions contains the request arguments for the Geolocate task. type geolocateOptions struct{} diff --git a/internal/libooniengine/logger.go b/internal/motor/logger.go similarity index 98% rename from internal/libooniengine/logger.go rename to internal/motor/logger.go index 960eece4f..833913241 100644 --- a/internal/libooniengine/logger.go +++ b/internal/motor/logger.go @@ -1,4 +1,4 @@ -package main +package motor import ( "fmt" @@ -80,7 +80,7 @@ func (tl *taskLogger) Warn(message string) { // emit emits a log message. func (tl *taskLogger) emit(level LogLevel, message string) { - logResp := &response{ + logResp := &Response{ Logger: logResponse{ Level: level, Message: message, diff --git a/internal/libooniengine/model.go b/internal/motor/model.go similarity index 89% rename from internal/libooniengine/model.go rename to internal/motor/model.go index 34750fc60..a462cddd9 100644 --- a/internal/libooniengine/model.go +++ b/internal/motor/model.go @@ -1,4 +1,4 @@ -package main +package motor import ( "context" @@ -6,7 +6,7 @@ import ( ) // request is the OONI request containing task-specific arguments. -type request struct { +type Request struct { NewSession newSessionOptions `json:",omitempty"` Geolocate geolocateOptions `json:",omitempty"` DeleteSession deleteSessionOptions `json:",omitempty"` @@ -14,7 +14,7 @@ type request struct { } // response is the OONI response to serialize before sending. -type response struct { +type Response struct { NewSession newSessionResponse `json:",omitempty"` Geolocate geolocateResponse `json:",omitempty"` Logger logResponse `json:",omitempty"` @@ -35,7 +35,7 @@ const taskEventsBuffer = 1024 type taskMaybeEmitter interface { // maybeEmitEvent emits an event if there's available buffer in the // output channel and otherwise discards the event. - maybeEmitEvent(resp *response) + maybeEmitEvent(resp *Response) } // taskRunner runs a given task. Any task that you can run from @@ -53,23 +53,23 @@ type taskRunner interface { // // - resp is the response to emit after the task is complete. Note that // this an implicit response and only indicates the final response of the task. - main(ctx context.Context, emitter taskMaybeEmitter, req *request, resp *response) + main(ctx context.Context, emitter taskMaybeEmitter, req *Request, resp *Response) } // taskAPI implements the OONI engine C API functions. We use this interface // to enable easier testing of the code that manages the tasks lifecycle. -type taskAPI interface { +type TaskAPI interface { // waitForNextEvent implements OONITaskWaitForNextEvent. - waitForNextEvent(timeout time.Duration) *response + WaitForNextEvent(timeout time.Duration) *Response // isDone implements OONITaskIsDone. - isDone() bool + IsDone() bool // interrupt implements OONITaskInterrupt. - interrupt() + Interrupt() // free implements OONITaskFree - free() + Free() } // taskRegistry maps each task name to its implementation. diff --git a/internal/libooniengine/session.go b/internal/motor/session.go similarity index 82% rename from internal/libooniengine/session.go rename to internal/motor/session.go index b9cd9b2b4..4e0dd4541 100644 --- a/internal/libooniengine/session.go +++ b/internal/motor/session.go @@ -1,4 +1,8 @@ -package main +package motor + +// +// session handles the bootstrap and delete tasks for OONI session. +// // newSessionOptions contains the request arguments for the NewSession task. type newSessionOptions struct{} diff --git a/internal/libooniengine/task.go b/internal/motor/task.go similarity index 78% rename from internal/libooniengine/task.go rename to internal/motor/task.go index 52d856d6e..c63ed8054 100644 --- a/internal/libooniengine/task.go +++ b/internal/motor/task.go @@ -1,4 +1,4 @@ -package main +package motor import ( "context" @@ -13,8 +13,8 @@ var ( errInvalidRequest = errors.New("input request has no valid task name") ) -// resolveTask resolves the task name to perform from the parsed request. -func resolveTask(req *request) (string, error) { +// ResolveTask resolves the task name to perform from the parsed request. +func ResolveTask(req *Request) (string, error) { r := reflect.ValueOf(req) t := r.Type() for i := 0; i < r.NumField(); i++ { @@ -26,15 +26,15 @@ func resolveTask(req *request) (string, error) { } // startTask starts a given task. -func startTask(name string, req *request) taskAPI { +func StartTask(name string, req *Request) TaskAPI { ctx, cancel := context.WithCancel(context.Background()) tp := &taskState{ cancel: cancel, done: &atomic.Int64{}, - events: make(chan *response, taskEventsBuffer), + events: make(chan *Response, taskEventsBuffer), stopped: make(chan any), } - go tp.main(ctx, name, req) + go tp.Main(ctx, name, req) return tp } @@ -47,16 +47,16 @@ type taskState struct { done *atomic.Int64 // events is the channel where we emit task events. - events chan *response + events chan *Response // stopped indicates that the task is done. stopped chan any } -var _ taskAPI = &taskState{} +var _ TaskAPI = &taskState{} // waitForNextEvent implements taskAPI.waitForNextEvent. -func (tp *taskState) waitForNextEvent(timeout time.Duration) *response { +func (tp *taskState) WaitForNextEvent(timeout time.Duration) *Response { // Implementation note: we don't need to log any of these nil-returning conditions // as they are not exceptional, rather they're part of normal usage. ctx, cancel := contextForWaitForNextEvent(timeout) @@ -88,28 +88,28 @@ func contextForWaitForNextEvent(timeo time.Duration) (context.Context, context.C } // isDone implements taskAPI.isDone. -func (tp *taskState) isDone() bool { +func (tp *taskState) IsDone() bool { return tp.done.Load() > 0 } // interrupt implements taskAPI.interrupt. -func (tp *taskState) interrupt() { +func (tp *taskState) Interrupt() { tp.cancel() } // free implements taskAPI.free -func (tp *taskState) free() { - tp.interrupt() - for !tp.isDone() { +func (tp *taskState) Free() { + tp.Interrupt() + for !tp.IsDone() { const blockForever = -1 - _ = tp.waitForNextEvent(blockForever) + _ = tp.WaitForNextEvent(blockForever) } } // main is the main function of the task. -func (tp *taskState) main(ctx context.Context, name string, req *request) { +func (tp *taskState) Main(ctx context.Context, name string, req *Request) { defer close(tp.stopped) // synchronize with caller - var resp *response + var resp *Response runner := taskRegistry[name] if runner == nil { log.Printf("OONITaskStart: unknown task name: %s", name) diff --git a/internal/libooniengine/test.go b/internal/motor/test.go similarity index 95% rename from internal/libooniengine/test.go rename to internal/motor/test.go index 4259337b8..6967e1700 100644 --- a/internal/libooniengine/test.go +++ b/internal/motor/test.go @@ -1,4 +1,4 @@ -package main +package motor // // test is a mock task to mimic the request-response API for the FFI consumer. @@ -34,7 +34,7 @@ var _ taskRunner = &testTaskRunner{} // main implements taskRunner.main func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, - req *request, res *response) { + req *Request, res *Response) { logger := newTaskLogger(emitter, false) if !req.Test.Test { logger.Warnf("task_runner: %s", errTestDisabled.Error()) From c73486fe75390d7a806d72b78543a87f31f5e114 Mon Sep 17 00:00:00 2001 From: decfox Date: Sat, 25 Mar 2023 12:13:15 +0530 Subject: [PATCH 07/12] feat: introduce TaskAPI.GetResult() --- internal/libooniengine/engine.h | 28 ++++++++++++++++++++++- internal/libooniengine/main.go | 11 ++++++++-- internal/motor/model.go | 5 ++++- internal/motor/task.go | 39 +++++++++++++++++++++++---------- internal/motor/test.go | 3 ++- 5 files changed, 69 insertions(+), 17 deletions(-) diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 0b0866c79..44ceec436 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -66,6 +66,32 @@ OONITask OONIEngineCall(char *req); /// checking whether this function has returned an empty string. char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); +/// OONIEngineTaskGetResult awaits on the result queue until the final +/// result is available or the given [timeout] expires. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @param timeout Timeout in milliseconds. If the timeout is zero +/// or negative, this function would potentially block forever. +/// +/// @return A NULL pointer on failure, non-NULL otherwise. If the return +/// value is non-NULL, the caller takes ownership of the char pointer +/// and MUST free it using OONIEngineFreeMemory when done using it. +/// +/// This function will return an empty string: +/// +/// 1. when the timeout expires; +/// +/// 2. if [task] is zero or does not refer to a valid task; +/// +/// 3. if we cannot JSON serialize the message; +/// +/// 4. possibly because of other unknown internal errors. +/// +/// In short, you cannot reliably determine whether a task is done by +/// checking whether this function has returned an empty string. +char *OONIEngineTaskGetResult(OONITask task, int32_t timeout); + /// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A taks is /// done when it has finished running _and_ its events queue has been drained. /// @@ -75,7 +101,7 @@ char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); /// unread events inside its events queue, zero otherwise. uint8_t OONIEngineTaskIsDone(OONITask task); -/// OONIEngineInterrupt tells [task] to stop ASAP. +/// OONIEngineInterruptTask tells [task] to stop ASAP. /// /// @param task Task handle returned by OONIEngineCall. If task is zero /// or does not refer to a valid task, this function will just do nothing. diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 85166be13..f0260a466 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -81,6 +81,13 @@ func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { return serialize(ev) } +//export OONIEngineTaskGetResult +func OONIEngineTaskGetResult(task C.OONITask, timeout C.int32_t) *C.char { + tp := cgo.Handle(task).Value().(motor.TaskAPI) + result := tp.GetResult(time.Duration(timeout) * time.Millisecond) + return serialize(result) +} + //export OONIEngineTaskIsDone func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { tp := cgo.Handle(task).Value().(motor.TaskAPI) @@ -90,8 +97,8 @@ func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { return } -//export OONIEngineInterrupt -func OONIEngineInterrupt(task C.OONITask) { +//export OONIEngineInterruptTask +func OONIEngineInterruptTask(task C.OONITask) { tp := cgo.Handle(task).Value().(motor.TaskAPI) tp.Interrupt() } diff --git a/internal/motor/model.go b/internal/motor/model.go index a462cddd9..ef67983d0 100644 --- a/internal/motor/model.go +++ b/internal/motor/model.go @@ -53,7 +53,7 @@ type taskRunner interface { // // - resp is the response to emit after the task is complete. Note that // this an implicit response and only indicates the final response of the task. - main(ctx context.Context, emitter taskMaybeEmitter, req *Request, resp *Response) + main(ctx context.Context, emitter taskMaybeEmitter, req *Request) *Response } // taskAPI implements the OONI engine C API functions. We use this interface @@ -62,6 +62,9 @@ type TaskAPI interface { // waitForNextEvent implements OONITaskWaitForNextEvent. WaitForNextEvent(timeout time.Duration) *Response + // GetResult implements OONITaskGetResult + GetResult(timeout time.Duration) *Response + // isDone implements OONITaskIsDone. IsDone() bool diff --git a/internal/motor/task.go b/internal/motor/task.go index c63ed8054..564bb6f75 100644 --- a/internal/motor/task.go +++ b/internal/motor/task.go @@ -32,9 +32,10 @@ func StartTask(name string, req *Request) TaskAPI { cancel: cancel, done: &atomic.Int64{}, events: make(chan *Response, taskEventsBuffer), + result: make(chan *Response, 1), stopped: make(chan any), } - go tp.Main(ctx, name, req) + go tp.main(ctx, name, req) return tp } @@ -49,13 +50,16 @@ type taskState struct { // events is the channel where we emit task events. events chan *Response + // result is the channel where we emit the final result. + result chan *Response + // stopped indicates that the task is done. stopped chan any } var _ TaskAPI = &taskState{} -// waitForNextEvent implements taskAPI.waitForNextEvent. +// WaitForNextEvent implements TaskAPI.WaitForNextEvent. func (tp *taskState) WaitForNextEvent(timeout time.Duration) *Response { // Implementation note: we don't need to log any of these nil-returning conditions // as they are not exceptional, rather they're part of normal usage. @@ -77,27 +81,39 @@ func (tp *taskState) WaitForNextEvent(timeout time.Duration) *Response { } } +// Result implements TaskAPI.Result +func (tp *taskState) GetResult(timeout time.Duration) *Response { + ctx, cancel := contextForWaitForNextEvent(timeout) + defer cancel() + select { + case <-ctx.Done(): + return nil // timeout while blocking for read + case ev := <-tp.result: + return ev // block for read till we receive a result + } +} + // contextForWaitForNextEvent returns the suitable context // for making the waitForNextEvent function time bounded. -func contextForWaitForNextEvent(timeo time.Duration) (context.Context, context.CancelFunc) { +func contextForWaitForNextEvent(timeout time.Duration) (context.Context, context.CancelFunc) { ctx := context.Background() - if timeo < 0 { + if timeout < 0 { return context.WithCancel(ctx) } - return context.WithTimeout(ctx, timeo) + return context.WithTimeout(ctx, timeout) } -// isDone implements taskAPI.isDone. +// IsDone implements TaskAPI.IsDone. func (tp *taskState) IsDone() bool { return tp.done.Load() > 0 } -// interrupt implements taskAPI.interrupt. +// Interrupt implements TaskAPI.Interrupt. func (tp *taskState) Interrupt() { tp.cancel() } -// free implements taskAPI.free +// Free implements TaskAPI.Free func (tp *taskState) Free() { tp.Interrupt() for !tp.IsDone() { @@ -107,9 +123,8 @@ func (tp *taskState) Free() { } // main is the main function of the task. -func (tp *taskState) Main(ctx context.Context, name string, req *Request) { +func (tp *taskState) main(ctx context.Context, name string, req *Request) { defer close(tp.stopped) // synchronize with caller - var resp *Response runner := taskRegistry[name] if runner == nil { log.Printf("OONITaskStart: unknown task name: %s", name) @@ -118,6 +133,6 @@ func (tp *taskState) Main(ctx context.Context, name string, req *Request) { emitter := &taskChanEmitter{ out: tp.events, } - defer emitter.maybeEmitEvent(resp) - runner.main(ctx, emitter, req, resp) + resp := runner.main(ctx, emitter, req) + tp.result <- resp // emit response to result channel } diff --git a/internal/motor/test.go b/internal/motor/test.go index 6967e1700..f616a389e 100644 --- a/internal/motor/test.go +++ b/internal/motor/test.go @@ -34,7 +34,7 @@ var _ taskRunner = &testTaskRunner{} // main implements taskRunner.main func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, - req *Request, res *Response) { + req *Request) (res *Response) { logger := newTaskLogger(emitter, false) if !req.Test.Test { logger.Warnf("task_runner: %s", errTestDisabled.Error()) @@ -43,4 +43,5 @@ func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, } logger.Info("task_runner: a log event for the Test task") res.Test.Response = "test success" + return } From 000abe5cfd6f403e4fd508ebbcac2e1086cc445d Mon Sep 17 00:00:00 2001 From: decfox Date: Sat, 25 Mar 2023 12:19:25 +0530 Subject: [PATCH 08/12] refactor: remove redundant comments --- internal/motor/model.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/motor/model.go b/internal/motor/model.go index ef67983d0..8ace8abf4 100644 --- a/internal/motor/model.go +++ b/internal/motor/model.go @@ -50,9 +50,6 @@ type taskRunner interface { // - emitter is the emitter to emit events; // // - req is the parsed request containing task specific arguments. - // - // - resp is the response to emit after the task is complete. Note that - // this an implicit response and only indicates the final response of the task. main(ctx context.Context, emitter taskMaybeEmitter, req *Request) *Response } From 9b904d8c03eba0352ce5e2a9b10defb794cd89dd Mon Sep 17 00:00:00 2001 From: decfox Date: Fri, 31 Mar 2023 11:29:06 +0530 Subject: [PATCH 09/12] changes from code review --- internal/libooniengine/engine.h | 50 ++++++++++++------------ internal/libooniengine/main.go | 69 +++++++++++++++++++++++++-------- internal/motor/emitter.go | 2 +- internal/motor/geolocate.go | 11 ------ internal/motor/logger.go | 25 ++++++------ internal/motor/model.go | 25 +++++------- internal/motor/session.go | 17 -------- internal/motor/task.go | 49 ++++++----------------- internal/motor/test.go | 18 ++++++--- 9 files changed, 124 insertions(+), 142 deletions(-) delete mode 100644 internal/motor/geolocate.go delete mode 100644 internal/motor/session.go diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 44ceec436..80a8bf41a 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -10,7 +10,8 @@ #include /// OONITask is an asynchronous thread of execution managed by the OONI -/// engine that performs a background operation and emits meaningful +/// engine that performs a background operation and emits interim outpus +/// like logs and progress and results of the operation with meaningful /// events such as, for example, the results of measurements. typedef uintptr_t OONITask; @@ -28,7 +29,7 @@ char *OONIEngineVersion(void); /// @param ptr a void pointer refering to the memory to be freed. void OONIEngineFreeMemory(void *ptr); -/// OONIEngineCall starts a new OONITask using the given [req]. +/// OONIEngineCall starts a new OONITask using the given @p req. /// /// @param req A JSON string, owned by the caller, that contains /// the configuration for the task to start. @@ -38,25 +39,26 @@ void OONIEngineFreeMemory(void *ptr); /// responsible to eventually dispose of the task using OONIEngineFreeTask. OONITask OONIEngineCall(char *req); -/// OONIEngineWaitForNextEvent awaits on the [task] event queue until -/// a new event is available or the given [timeout] expires. +/// OONIEngineWaitForNextEvent awaits on the @p task event queue until +/// a new event is available or the given @p timeout expires. /// /// @param task Task handle returned by OONIEngineCall. /// /// @param timeout Timeout in milliseconds. If the timeout is zero /// or negative, this function would potentially block forever. /// -/// @return A NULL pointer on failure, non-NULL otherwise. If the return -/// value is non-NULL, the caller takes ownership of the char pointer -/// and MUST free it using OONIEngineFreeMemory when done using it. +/// @return A NULL pointer on failure, non-NULL JSON string otherwise. +/// If the return value is non-NULL, the caller takes ownership of the +/// char pointer and MUST free it using OONIEngineFreeMemory when done +/// using it. /// /// This function will return an empty string: /// /// 1. when the timeout expires; /// -/// 2. if [task] is done; +/// 2. if @p task is done; /// -/// 3. if [task] is zero or does not refer to a valid task; +/// 3. if @p task is zero or does not refer to a valid task; /// /// 4. if we cannot JSON serialize the message; /// @@ -67,32 +69,28 @@ OONITask OONIEngineCall(char *req); char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); /// OONIEngineTaskGetResult awaits on the result queue until the final -/// result is available or the given [timeout] expires. +/// result is available. /// /// @param task Task handle returned by OONIEngineCall. /// -/// @param timeout Timeout in milliseconds. If the timeout is zero -/// or negative, this function would potentially block forever. -/// -/// @return A NULL pointer on failure, non-NULL otherwise. If the return -/// value is non-NULL, the caller takes ownership of the char pointer -/// and MUST free it using OONIEngineFreeMemory when done using it. +/// @return A NULL pointer on failure, non-NULL JSON string otherwise. +/// If the return value is non-NULL, the caller takes ownership of the +/// char pointer and MUST free it using OONIEngineFreeMemory when done +/// using it. /// -/// This function will return an empty string: -/// -/// 1. when the timeout expires; +/// This function will return a NULL pointer: /// -/// 2. if [task] is zero or does not refer to a valid task; +/// 1. if @p task is zero or does not refer to a valid task; /// -/// 3. if we cannot JSON serialize the message; +/// 2. if we cannot JSON serialize the message; /// -/// 4. possibly because of other unknown internal errors. +/// 3. possibly because of other unknown internal errors. /// /// In short, you cannot reliably determine whether a task is done by /// checking whether this function has returned an empty string. char *OONIEngineTaskGetResult(OONITask task, int32_t timeout); -/// OONIEngineTaskIsDone returns whether the task identified by [taskID] is done. A taks is +/// OONIEngineTaskIsDone returns whether @p task is done. A task is /// done when it has finished running _and_ its events queue has been drained. /// /// @param task Task handle returned by OONIEngineCall. @@ -101,14 +99,14 @@ char *OONIEngineTaskGetResult(OONITask task, int32_t timeout); /// unread events inside its events queue, zero otherwise. uint8_t OONIEngineTaskIsDone(OONITask task); -/// OONIEngineInterruptTask tells [task] to stop ASAP. +/// OONIEngineInterruptTask tells @p task to stop ASAP. /// /// @param task Task handle returned by OONIEngineCall. If task is zero /// or does not refer to a valid task, this function will just do nothing. void OONIEngineInterruptTask(OONITask task); -/// OONIEngineFreeTask free the memory associated with [task]. If the task is still running, this -/// function will also interrupt it and drain its events queue. +/// OONIEngineFreeTask frees the memory associated with @p task. If the task is +/// still running, this function will also interrupt it. /// /// @param task Task handle returned by OONIEngineCall. If task is zero /// or does not refer to a valid task, this function will just do nothing. diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index f0260a466..7cb13858c 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -27,8 +27,8 @@ const ( // parse converts a JSON request string to the concrete Go type. func parse(req *C.char) (*motor.Request, error) { - var out *motor.Request - if err := json.Unmarshal([]byte(C.GoString(req)), out); err != nil { + out := &motor.Request{} + if err := json.Unmarshal([]byte(C.GoString(req)), &out); err != nil { return nil, err } return out, nil @@ -36,14 +36,34 @@ func parse(req *C.char) (*motor.Request, error) { // serialize serializes a OONI response to a JSON string accessible to C code. func serialize(resp *motor.Response) *C.char { + if resp == nil { + return nil + } out, err := json.Marshal(resp) if err != nil { log.Printf("serializeMessage: cannot serialize message: %s", err.Error()) - return C.CString("") + return nil } return C.CString(string(out)) } +// getTaskHandle checks if the task handle is valid and returns the corresponding TaskAPI. +func getTaskHandle(task C.OONITask) (tp motor.TaskAPI) { + handle := cgo.Handle(task) + defer func() { + if r := recover(); r != nil { + handle.Delete() + tp = nil // return a nil TaskAPI when handle.Value() panics + } + }() + val := handle.Value() // this can panic if handle is invalid + tp, ok := val.(motor.TaskAPI) + if !ok { + handle.Delete() + } + return +} + //export OONIEngineVersion func OONIEngineVersion() *C.char { return C.CString(version.Version) @@ -61,14 +81,9 @@ func OONIEngineCall(req *C.char) C.OONITask { log.Printf("OONIEngineCall: %s", err.Error()) return invalidTaskHandle } - taskName, err := motor.ResolveTask(r) - if err != nil { - log.Printf("OONIEngineCall: %s", err.Error()) - return invalidTaskHandle - } - tp := motor.StartTask(taskName, r) + tp := motor.StartTask(r) if tp == nil { - log.Printf("OONITaskStart: startTask return NULL") + log.Printf("OONITaskStart: startTask returned NULL") return invalidTaskHandle } return C.OONITask(cgo.NewHandle(tp)) @@ -76,21 +91,30 @@ func OONIEngineCall(req *C.char) C.OONITask { //export OONIEngineWaitForNextEvent func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { - tp := cgo.Handle(task).Value().(motor.TaskAPI) + tp := getTaskHandle(task) + if tp == nil { + return nil + } ev := tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond) return serialize(ev) } //export OONIEngineTaskGetResult func OONIEngineTaskGetResult(task C.OONITask, timeout C.int32_t) *C.char { - tp := cgo.Handle(task).Value().(motor.TaskAPI) - result := tp.GetResult(time.Duration(timeout) * time.Millisecond) + tp := getTaskHandle(task) + if tp == nil { + return nil + } + result := tp.Result() return serialize(result) } //export OONIEngineTaskIsDone func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { - tp := cgo.Handle(task).Value().(motor.TaskAPI) + tp := getTaskHandle(task) + if tp == nil { + return + } if !tp.IsDone() { out++ } @@ -99,16 +123,27 @@ func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { //export OONIEngineInterruptTask func OONIEngineInterruptTask(task C.OONITask) { - tp := cgo.Handle(task).Value().(motor.TaskAPI) + tp := getTaskHandle(task) + if tp == nil { + return + } tp.Interrupt() } //export OONIEngineFreeTask func OONIEngineFreeTask(task C.OONITask) { handle := cgo.Handle(task) - tp := handle.Value().(motor.TaskAPI) + defer func() { + if r := recover(); r != nil { + handle.Delete() + } + }() + val := handle.Value() // this can panic if handle is invalid + tp, ok := val.(motor.TaskAPI) + if ok { + tp.Interrupt() + } handle.Delete() - tp.Free() } func main() { diff --git a/internal/motor/emitter.go b/internal/motor/emitter.go index 213c14c75..5af08c554 100644 --- a/internal/motor/emitter.go +++ b/internal/motor/emitter.go @@ -4,7 +4,7 @@ package motor // Emitter // -// taskEmitter implements taskMaybeEmitter. +// taskChanEmitter implements taskMaybeEmitter. type taskChanEmitter struct { // out is the channel where we emit events. out chan *Response diff --git a/internal/motor/geolocate.go b/internal/motor/geolocate.go deleted file mode 100644 index d0188c0b7..000000000 --- a/internal/motor/geolocate.go +++ /dev/null @@ -1,11 +0,0 @@ -package motor - -// -// geolocate handles the geolocate task for a passed session. -// - -// geolocateOptions contains the request arguments for the Geolocate task. -type geolocateOptions struct{} - -// geolocateResponse is the response for the Geolocate task. -type geolocateResponse struct{} diff --git a/internal/motor/logger.go b/internal/motor/logger.go index 833913241..9c2a04ea6 100644 --- a/internal/motor/logger.go +++ b/internal/motor/logger.go @@ -10,16 +10,17 @@ type LogLevel string const ( // The DEBUG log level. - logLevel_DEBUG LogLevel = "DEBUG" + logDebug LogLevel = "DEBUG" // The INFO log level. - logLevel_INFO LogLevel = "INFO" + logInfo LogLevel = "INFO" // The WARNING log level. - logLevel_WARNING LogLevel = "WARNING" + logWarning LogLevel = "WARNING" ) -type logResponse struct { +// LogResponse is the response for any logging task. +type LogResponse struct { Level LogLevel `json:",omitempty"` Message string `json:",omitempty"` } @@ -34,7 +35,7 @@ type taskLogger struct { } // newLogger creates a new taskLogger instance using -// the [emitter] to emit log events. +// the emitter to emit log events. func newTaskLogger(emitter taskMaybeEmitter, verbose bool) *taskLogger { return &taskLogger{ emitter: emitter, @@ -47,41 +48,41 @@ var _ model.Logger = &taskLogger{} // Debugf implements model.Logger.Debugf. func (tl *taskLogger) Debugf(format string, values ...any) { if tl.verbose { - tl.emit(logLevel_DEBUG, fmt.Sprintf(format, values...)) + tl.emit(logDebug, fmt.Sprintf(format, values...)) } } // Debug implements model.Logger.Debug. func (tl *taskLogger) Debug(message string) { if tl.verbose { - tl.emit(logLevel_DEBUG, message) + tl.emit(logDebug, message) } } // Infof implements model.Logger.Infof. func (tl *taskLogger) Infof(format string, values ...any) { - tl.emit(logLevel_INFO, fmt.Sprintf(format, values...)) + tl.emit(logInfo, fmt.Sprintf(format, values...)) } // Info implements model.Logger.Info. func (tl *taskLogger) Info(message string) { - tl.emit(logLevel_INFO, message) + tl.emit(logInfo, message) } // Warnf implements model.Logger.Warnf. func (tl *taskLogger) Warnf(format string, values ...any) { - tl.emit(logLevel_WARNING, fmt.Sprintf(format, values...)) + tl.emit(logWarning, fmt.Sprintf(format, values...)) } // Warn implements model.Logger.Warn. func (tl *taskLogger) Warn(message string) { - tl.emit(logLevel_WARNING, message) + tl.emit(logWarning, message) } // emit emits a log message. func (tl *taskLogger) emit(level LogLevel, message string) { logResp := &Response{ - Logger: logResponse{ + Logger: LogResponse{ Level: level, Message: message, }, diff --git a/internal/motor/model.go b/internal/motor/model.go index 8ace8abf4..ad748455b 100644 --- a/internal/motor/model.go +++ b/internal/motor/model.go @@ -2,25 +2,21 @@ package motor import ( "context" + "encoding/json" "time" ) -// request is the OONI request containing task-specific arguments. +// request is the OONI request containing task name and arguments. type Request struct { - NewSession newSessionOptions `json:",omitempty"` - Geolocate geolocateOptions `json:",omitempty"` - DeleteSession deleteSessionOptions `json:",omitempty"` - Test testOptions `json:",omitempty"` + Name string `json:",omitempty"` + Arguments json.RawMessage `json:",omitempty"` } // response is the OONI response to serialize before sending. type Response struct { - NewSession newSessionResponse `json:",omitempty"` - Geolocate geolocateResponse `json:",omitempty"` - Logger logResponse `json:",omitempty"` - DeleteSession deleteSessionResponse `json:",omitempty"` - Test testResponse `json:",omitempty"` - Error string `json:",omitempty"` + Logger LogResponse `json:",omitempty"` + Test testResponse `json:",omitempty"` + Error string `json:",omitempty"` } // taskEventsBuffer is the buffer used for the task's event chan, which @@ -50,7 +46,7 @@ type taskRunner interface { // - emitter is the emitter to emit events; // // - req is the parsed request containing task specific arguments. - main(ctx context.Context, emitter taskMaybeEmitter, req *Request) *Response + main(ctx context.Context, emitter taskMaybeEmitter, req *Request, resp *Response) } // taskAPI implements the OONI engine C API functions. We use this interface @@ -60,16 +56,13 @@ type TaskAPI interface { WaitForNextEvent(timeout time.Duration) *Response // GetResult implements OONITaskGetResult - GetResult(timeout time.Duration) *Response + Result() *Response // isDone implements OONITaskIsDone. IsDone() bool // interrupt implements OONITaskInterrupt. Interrupt() - - // free implements OONITaskFree - Free() } // taskRegistry maps each task name to its implementation. diff --git a/internal/motor/session.go b/internal/motor/session.go deleted file mode 100644 index 4e0dd4541..000000000 --- a/internal/motor/session.go +++ /dev/null @@ -1,17 +0,0 @@ -package motor - -// -// session handles the bootstrap and delete tasks for OONI session. -// - -// newSessionOptions contains the request arguments for the NewSession task. -type newSessionOptions struct{} - -// newSessionResponse is the response for the NewSession task. -type newSessionResponse struct{} - -// deleteSessionOptions contains the request arguments for the DeleteSession task. -type deleteSessionOptions struct{} - -// deleteSessionResponse is the response for the DeleteSession task. -type deleteSessionResponse struct{} diff --git a/internal/motor/task.go b/internal/motor/task.go index 564bb6f75..c4d214a15 100644 --- a/internal/motor/task.go +++ b/internal/motor/task.go @@ -4,7 +4,6 @@ import ( "context" "errors" "log" - "reflect" "sync/atomic" "time" ) @@ -13,20 +12,8 @@ var ( errInvalidRequest = errors.New("input request has no valid task name") ) -// ResolveTask resolves the task name to perform from the parsed request. -func ResolveTask(req *Request) (string, error) { - r := reflect.ValueOf(req) - t := r.Type() - for i := 0; i < r.NumField(); i++ { - if !r.Field(i).IsNil() { - return t.Field(i).Name, nil - } - } - return "", errInvalidRequest -} - // startTask starts a given task. -func StartTask(name string, req *Request) TaskAPI { +func StartTask(req *Request) TaskAPI { ctx, cancel := context.WithCancel(context.Background()) tp := &taskState{ cancel: cancel, @@ -35,7 +22,7 @@ func StartTask(name string, req *Request) TaskAPI { result: make(chan *Response, 1), stopped: make(chan any), } - go tp.main(ctx, name, req) + go tp.main(ctx, req) return tp } @@ -82,15 +69,8 @@ func (tp *taskState) WaitForNextEvent(timeout time.Duration) *Response { } // Result implements TaskAPI.Result -func (tp *taskState) GetResult(timeout time.Duration) *Response { - ctx, cancel := contextForWaitForNextEvent(timeout) - defer cancel() - select { - case <-ctx.Done(): - return nil // timeout while blocking for read - case ev := <-tp.result: - return ev // block for read till we receive a result - } +func (tp *taskState) Result() *Response { + return <-tp.result } // contextForWaitForNextEvent returns the suitable context @@ -113,26 +93,21 @@ func (tp *taskState) Interrupt() { tp.cancel() } -// Free implements TaskAPI.Free -func (tp *taskState) Free() { - tp.Interrupt() - for !tp.IsDone() { - const blockForever = -1 - _ = tp.WaitForNextEvent(blockForever) - } -} - // main is the main function of the task. -func (tp *taskState) main(ctx context.Context, name string, req *Request) { +func (tp *taskState) main(ctx context.Context, req *Request) { defer close(tp.stopped) // synchronize with caller - runner := taskRegistry[name] + taskName := req.Name + resp := &Response{} + runner := taskRegistry[taskName] if runner == nil { - log.Printf("OONITaskStart: unknown task name: %s", name) + log.Printf("OONITaskStart: unknown task name: %s", taskName) + resp.Error = errInvalidRequest.Error() + tp.result <- resp return } emitter := &taskChanEmitter{ out: tp.events, } - resp := runner.main(ctx, emitter, req) + runner.main(ctx, emitter, req, resp) tp.result <- resp // emit response to result channel } diff --git a/internal/motor/test.go b/internal/motor/test.go index f616a389e..a6b64be53 100644 --- a/internal/motor/test.go +++ b/internal/motor/test.go @@ -6,6 +6,7 @@ package motor import ( "context" + "encoding/json" "errors" ) @@ -15,6 +16,8 @@ func init() { var ( errTestDisabled = errors.New("request argument for test disabled") + + errParseFailed = errors.New("unable to parse task arguments") ) // testOptions contains the request options for the Test task. @@ -34,14 +37,19 @@ var _ taskRunner = &testTaskRunner{} // main implements taskRunner.main func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, - req *Request) (res *Response) { + req *Request, resp *Response) { logger := newTaskLogger(emitter, false) - if !req.Test.Test { + args := &testOptions{} + if err := json.Unmarshal(req.Arguments, args); err != nil { + logger.Warn("task_runner: %s") + resp.Test.Error = errParseFailed.Error() + return + } + if !args.Test { logger.Warnf("task_runner: %s", errTestDisabled.Error()) - res.Test.Error = errTestDisabled.Error() + resp.Test.Error = errTestDisabled.Error() return } logger.Info("task_runner: a log event for the Test task") - res.Test.Response = "test success" - return + resp.Test.Response = "test success" } From d0a225a0e7c0ae55ff86c29464fed6d0af68aa16 Mon Sep 17 00:00:00 2001 From: decfox Date: Fri, 31 Mar 2023 11:30:32 +0530 Subject: [PATCH 10/12] x --- internal/libooniengine/engine.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 80a8bf41a..8474675f4 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -52,7 +52,7 @@ OONITask OONIEngineCall(char *req); /// char pointer and MUST free it using OONIEngineFreeMemory when done /// using it. /// -/// This function will return an empty string: +/// This function will return a NULL pointer: /// /// 1. when the timeout expires; /// From 62ca51bdd494f424997df8f79594a5d120cb8540 Mon Sep 17 00:00:00 2001 From: decfox Date: Fri, 31 Mar 2023 23:29:52 +0530 Subject: [PATCH 11/12] refactor: handle non-positive timeout explicitly --- internal/libooniengine/engine.h | 2 +- internal/libooniengine/main.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 8474675f4..8f4cc4a00 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -88,7 +88,7 @@ char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); /// /// In short, you cannot reliably determine whether a task is done by /// checking whether this function has returned an empty string. -char *OONIEngineTaskGetResult(OONITask task, int32_t timeout); +char *OONIEngineTaskGetResult(OONITask task); /// OONIEngineTaskIsDone returns whether @p task is done. A task is /// done when it has finished running _and_ its events queue has been drained. diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 7cb13858c..79a846573 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -95,12 +95,17 @@ func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { if tp == nil { return nil } - ev := tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond) + var ev *motor.Response + if timeout <= 0 { + ev = tp.WaitForNextEvent(time.Duration(timeout)) + } else { + ev = tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond) + } return serialize(ev) } //export OONIEngineTaskGetResult -func OONIEngineTaskGetResult(task C.OONITask, timeout C.int32_t) *C.char { +func OONIEngineTaskGetResult(task C.OONITask) *C.char { tp := getTaskHandle(task) if tp == nil { return nil From e38a2749c8e2fe774b378301369af4bcd4f54bfd Mon Sep 17 00:00:00 2001 From: decfox Date: Thu, 6 Apr 2023 10:04:20 +0530 Subject: [PATCH 12/12] feat: introduce custom handle --- internal/libooniengine/engine.h | 2 +- internal/libooniengine/handle.go | 90 ++++++++++++++++++++++++++++++++ internal/libooniengine/main.go | 46 +++++----------- 3 files changed, 104 insertions(+), 34 deletions(-) create mode 100644 internal/libooniengine/handle.go diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 8f4cc4a00..66c417bfd 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -13,7 +13,7 @@ /// engine that performs a background operation and emits interim outpus /// like logs and progress and results of the operation with meaningful /// events such as, for example, the results of measurements. -typedef uintptr_t OONITask; +typedef intptr_t OONITask; #ifdef __cplusplus extern "C" { diff --git a/internal/libooniengine/handle.go b/internal/libooniengine/handle.go new file mode 100644 index 000000000..12cc88d16 --- /dev/null +++ b/internal/libooniengine/handle.go @@ -0,0 +1,90 @@ +package main + +// +// Handle mimics cgo.Handle but uses a intptr +// + +//#include +// +//#include "engine.h" +import "C" + +import ( + "errors" + "log" + "sync" + + "github.com/ooni/probe-cli/v3/internal/motor" +) + +var ( + handler Handler + + // errHandleMisuse indicates that an invalid handle was misused + errHandleMisuse = errors.New("misuse of a invalid handle") + + // errHandleSpaceExceeded + errHandleSpaceExceeded = errors.New("ran out of handle space") +) + +func init() { + handler = Handler{ + handles: make(map[Handle]interface{}), + } +} + +type Handle C.intptr_t + +// Handler handles the entirety of handle operations. +type Handler struct { + handles map[Handle]interface{} + handleIdx Handle + mu sync.Mutex +} + +// newHandle returns a handle for a given value +// if we run out of handle space, a zero handle is returned. +func (h *Handler) newHandle(v any) (Handle, error) { + h.mu.Lock() + defer h.mu.Unlock() + ptr := C.intptr_t(h.handleIdx) + newId := ptr + 1 + if newId < 0 { + return Handle(0), errHandleSpaceExceeded + } + h.handleIdx = Handle(newId) + h.handles[h.handleIdx] = v + return h.handleIdx, nil +} + +// delete invalidates a handle +func (h *Handler) delete(hd Handle) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.handles, hd) +} + +// value returns the associated go value for a valid handle +func (h *Handler) value(hd Handle) (any, error) { + v, ok := h.handles[hd] + if !ok { + return nil, errHandleMisuse + } + return v, nil +} + +// getTaskHandle checks if the task handle is valid and returns the corresponding TaskAPI. +func (h *Handler) getTaskHandle(task C.OONITask) (tp motor.TaskAPI) { + hd := Handle(task) + val, err := h.value(hd) + if err != nil { + log.Printf("getTaskHandle: %s", err.Error()) + return + } + tp, ok := val.(motor.TaskAPI) + if !ok { + log.Printf("getTaskHandle: invalid type assertion") + return + } + return +} diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go index 79a846573..ef40808be 100644 --- a/internal/libooniengine/main.go +++ b/internal/libooniengine/main.go @@ -12,7 +12,6 @@ import "C" import ( "encoding/json" "log" - "runtime/cgo" "time" "unsafe" @@ -47,23 +46,6 @@ func serialize(resp *motor.Response) *C.char { return C.CString(string(out)) } -// getTaskHandle checks if the task handle is valid and returns the corresponding TaskAPI. -func getTaskHandle(task C.OONITask) (tp motor.TaskAPI) { - handle := cgo.Handle(task) - defer func() { - if r := recover(); r != nil { - handle.Delete() - tp = nil // return a nil TaskAPI when handle.Value() panics - } - }() - val := handle.Value() // this can panic if handle is invalid - tp, ok := val.(motor.TaskAPI) - if !ok { - handle.Delete() - } - return -} - //export OONIEngineVersion func OONIEngineVersion() *C.char { return C.CString(version.Version) @@ -86,12 +68,17 @@ func OONIEngineCall(req *C.char) C.OONITask { log.Printf("OONITaskStart: startTask returned NULL") return invalidTaskHandle } - return C.OONITask(cgo.NewHandle(tp)) + handle, err := handler.newHandle(tp) + if err != nil { + log.Printf("OONITaskStart: %s", err.Error()) + return invalidTaskHandle + } + return C.OONITask(handle) } //export OONIEngineWaitForNextEvent func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { - tp := getTaskHandle(task) + tp := handler.getTaskHandle(task) if tp == nil { return nil } @@ -106,7 +93,7 @@ func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { //export OONIEngineTaskGetResult func OONIEngineTaskGetResult(task C.OONITask) *C.char { - tp := getTaskHandle(task) + tp := handler.getTaskHandle(task) if tp == nil { return nil } @@ -116,7 +103,7 @@ func OONIEngineTaskGetResult(task C.OONITask) *C.char { //export OONIEngineTaskIsDone func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { - tp := getTaskHandle(task) + tp := handler.getTaskHandle(task) if tp == nil { return } @@ -128,7 +115,7 @@ func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { //export OONIEngineInterruptTask func OONIEngineInterruptTask(task C.OONITask) { - tp := getTaskHandle(task) + tp := handler.getTaskHandle(task) if tp == nil { return } @@ -137,18 +124,11 @@ func OONIEngineInterruptTask(task C.OONITask) { //export OONIEngineFreeTask func OONIEngineFreeTask(task C.OONITask) { - handle := cgo.Handle(task) - defer func() { - if r := recover(); r != nil { - handle.Delete() - } - }() - val := handle.Value() // this can panic if handle is invalid - tp, ok := val.(motor.TaskAPI) - if ok { + tp := handler.getTaskHandle(task) + if tp != nil { tp.Interrupt() } - handle.Delete() + handler.delete(Handle(task)) } func main() {