From 9a611093dd55490e4745313ac3332188748812bb Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Thu, 20 Jun 2024 08:41:02 -0700 Subject: [PATCH] add support for deferred tasks Signed-off-by: Dmitry Shmulevich --- pkg/engine/deferrer.go | 136 +++++++++++++++++++++++++++++++++ pkg/engine/deferrer_test.go | 56 ++++++++++++++ pkg/utils/chrono_queue.go | 79 +++++++++++++++++++ pkg/utils/chrono_queue_test.go | 47 ++++++++++++ 4 files changed, 318 insertions(+) create mode 100644 pkg/engine/deferrer.go create mode 100644 pkg/engine/deferrer_test.go create mode 100644 pkg/utils/chrono_queue.go create mode 100644 pkg/utils/chrono_queue_test.go diff --git a/pkg/engine/deferrer.go b/pkg/engine/deferrer.go new file mode 100644 index 0000000..62c3ba5 --- /dev/null +++ b/pkg/engine/deferrer.go @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package engine + +import ( + "context" + "sync" + "time" + + "github.com/NVIDIA/knavigator/pkg/config" + "github.com/NVIDIA/knavigator/pkg/utils" + "github.com/go-logr/logr" +) + +type executor interface { + RunTask(context.Context, *config.Task) error +} + +type Deferrer struct { + mutex sync.Mutex + log logr.Logger + executor executor + queue *utils.ChronoQueue + next time.Time + timer *time.Timer + wg sync.WaitGroup +} + +func NewDereffer(log logr.Logger, executor executor) *Deferrer { + d := &Deferrer{ + log: log, + executor: executor, + queue: utils.NewChronoQueue(), + timer: time.NewTimer(0), + } + + // disregard first timer event - cannot set timer without duration + <-d.timer.C + + return d +} + +func (d *Deferrer) AddTask(cfg *config.Task, offset time.Duration) { + d.mutex.Lock() + defer d.mutex.Unlock() + + timestamp := time.Now().Add(offset) + d.log.Info("Deferrer added task", "type", cfg.Type, "ID", cfg.ID, "time", timestamp) + + d.wg.Add(1) + d.queue.Add(cfg, timestamp) + + if next := d.queue.Peek().Timestamp; next != d.next { + d.next = next + d.timer.Reset(time.Until(next)) + d.log.Info("next event", "time", next.String()) + } +} + +func (d *Deferrer) GetTask() *config.Task { + d.mutex.Lock() + defer d.mutex.Unlock() + + task := d.queue.Remove() + + if d.queue.Len() > 0 { + d.next = d.queue.Peek().Timestamp + d.timer.Reset(time.Until(d.next)) + d.log.Info("next event", "time", d.next.String()) + } else { + d.next = time.Time{} + } + + return task.Value.(*config.Task) +} + +func (d *Deferrer) Start(ctx context.Context) { + go d.start(ctx) +} + +func (d *Deferrer) start(ctx context.Context) { + defer d.timer.Stop() + + for { + select { + case <-d.timer.C: + cfg := d.GetTask() + d.log.Info("Deferrer initiates task", "type", cfg.Type, "ID", cfg.ID) + err := d.executor.RunTask(ctx, cfg) + if err != nil { + d.log.Error(err, "failed to execute task", "type", cfg.Type, "ID", cfg.ID) + } + d.wg.Done() + + case <-ctx.Done(): + d.log.Info("Stop deferrer") + return + } + } +} + +func (d *Deferrer) Wait(ctx context.Context, timeout time.Duration) error { + d.log.Info("Waiting for deferrer to complete task") + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + done := make(chan struct{}) + + go func() { + d.wg.Wait() + done <- struct{}{} + }() + + select { + case <-done: + d.log.Info("Deferrer stopped") + return nil + case <-ctx.Done(): + d.log.Info("Deferrer didn't stop in allocated time") + return ctx.Err() + } +} diff --git a/pkg/engine/deferrer_test.go b/pkg/engine/deferrer_test.go new file mode 100644 index 0000000..0440aac --- /dev/null +++ b/pkg/engine/deferrer_test.go @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package engine + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/NVIDIA/knavigator/pkg/config" +) + +type testExecutor struct { + tasks []string +} + +func (exec *testExecutor) RunTask(_ context.Context, cfg *config.Task) error { + exec.tasks = append(exec.tasks, cfg.ID) + return nil +} + +func TestDeferrer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exec := &testExecutor{tasks: []string{}} + deferrer := NewDereffer(testLogger, exec) + deferrer.Start(ctx) + + deferrer.AddTask(&config.Task{ID: "t3"}, 3*time.Second) + deferrer.AddTask(&config.Task{ID: "t1"}, 1*time.Second) + deferrer.AddTask(&config.Task{ID: "t5"}, 5*time.Second) + deferrer.AddTask(&config.Task{ID: "t4"}, 4*time.Second) + deferrer.AddTask(&config.Task{ID: "t2"}, 2*time.Second) + deferrer.AddTask(&config.Task{ID: "t6"}, 6*time.Second) + + err := deferrer.Wait(ctx, 8*time.Second) + require.NoError(t, err) + require.Equal(t, []string{"t1", "t2", "t3", "t4", "t5", "t6"}, exec.tasks) +} diff --git a/pkg/utils/chrono_queue.go b/pkg/utils/chrono_queue.go new file mode 100644 index 0000000..7afad75 --- /dev/null +++ b/pkg/utils/chrono_queue.go @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "container/heap" + "time" +) + +type ChronoItem struct { + Value interface{} + Timestamp time.Time +} + +// ChronoQueue is a priority queue that orders items based on their timestamp +type ChronoQueue []*ChronoItem + +func NewChronoQueue() *ChronoQueue { + cq := &ChronoQueue{} + heap.Init(cq) + return cq +} + +// Len returns the length of the heap +func (h ChronoQueue) Len() int { return len(h) } + +// Less compares two items based on their timestamps +func (h ChronoQueue) Less(i, j int) bool { + return h[i].Timestamp.Before(h[j].Timestamp) +} + +// Swap swaps two items in the heap +func (h ChronoQueue) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +// Push adds an item to the heap +func (h *ChronoQueue) Push(x interface{}) { + *h = append(*h, x.(*ChronoItem)) +} + +// Pop removes and returns the item with the earliest timestamp +func (h *ChronoQueue) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil + *h = old[0 : n-1] + return x +} + +// Add is a convenient wrapper for the Push method +func (h *ChronoQueue) Add(val interface{}, timestamp time.Time) { + heap.Push(h, &ChronoItem{Value: val, Timestamp: timestamp}) +} + +// Remove is a convenient wrapper for the Pop method +func (h *ChronoQueue) Remove() *ChronoItem { + return heap.Pop(h).(*ChronoItem) +} + +// Peek returns the item with the earliest timestamp, but does not remove it +func (h *ChronoQueue) Peek() *ChronoItem { + return (*h)[0] +} diff --git a/pkg/utils/chrono_queue_test.go b/pkg/utils/chrono_queue_test.go new file mode 100644 index 0000000..6fd783c --- /dev/null +++ b/pkg/utils/chrono_queue_test.go @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestChronoQueue(t *testing.T) { + cq := NewChronoQueue() + + now := time.Now() + cq.Add("t3", now.Add(3*time.Second)) + cq.Add("t1", now.Add(1*time.Second)) + cq.Add("t5", now.Add(5*time.Second)) + cq.Add("t4", now.Add(4*time.Second)) + cq.Add("t2", now.Add(2*time.Second)) + cq.Add("t6", now.Add(6*time.Second)) + + expexted := []string{"t1", "t2", "t3", "t4", "t5", "t6"} + actual := make([]string, 0, len(expexted)) + + for cq.Len() > 0 { + item := cq.Peek() + require.Equal(t, item, cq.Remove()) + actual = append(actual, item.Value.(string)) + } + + require.Equal(t, expexted, actual) +}