Skip to content

Commit

Permalink
add support for deferred tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed Jun 21, 2024
1 parent fc9f24c commit ed0e929
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 0 deletions.
136 changes: 136 additions & 0 deletions pkg/engine/deferrer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package engine

import (
"context"
"sync"
"time"

"github.com/NVIDIA/knavigator/pkg/config"
"github.com/NVIDIA/knavigator/pkg/utils"
"github.com/go-logr/logr"
)

type executor interface {
RunTask(context.Context, *config.Task) error
}

type Deferrer struct {
mutex sync.Mutex
log logr.Logger
executor executor
queue *utils.ChronoQueue
next time.Time
timer *time.Timer
wg sync.WaitGroup
}

func NewDereffer(log logr.Logger, executor executor) *Deferrer {
d := &Deferrer{
log: log,
executor: executor,
queue: utils.NewChronoQueue(),
timer: time.NewTimer(0),
}

// disregard first timer event - cannot set timer without duration
<-d.timer.C

return d
}

func (d *Deferrer) AddTask(cfg *config.Task, offset time.Duration) {
d.mutex.Lock()
defer d.mutex.Unlock()

timestamp := time.Now().Add(offset)
d.log.Info("Deferrer added task", "type", cfg.Type, "ID", cfg.ID, "time", timestamp)

d.wg.Add(1)
d.queue.Add(cfg, timestamp)

if next := d.queue.Peek().Timestamp; next != d.next {
d.next = next
d.timer.Reset(time.Until(next))
d.log.Info("next event", "time", next.String())
}
}

func (d *Deferrer) GetTask() *config.Task {
d.mutex.Lock()
defer d.mutex.Unlock()

task := d.queue.Remove()

if d.queue.Len() > 0 {
d.next = d.queue.Peek().Timestamp
d.timer.Reset(time.Until(d.next))
d.log.Info("next event", "time", d.next.String())
} else {
d.next = time.Time{}
}

return task.Value.(*config.Task)
}

func (d *Deferrer) Start(ctx context.Context) {
go d.start(ctx)
}

func (d *Deferrer) start(ctx context.Context) {
defer d.timer.Stop()

for {
select {
case <-d.timer.C:
cfg := d.GetTask()
d.log.Info("Deferrer initiates task", "type", cfg.Type, "ID", cfg.ID)
err := d.executor.RunTask(ctx, cfg)
if err != nil {
d.log.Error(err, "failed to execute task", "type", cfg.Type, "ID", cfg.ID)
}
d.wg.Done()

case <-ctx.Done():
d.log.Info("Stop deferrer")
return
}
}
}

func (d *Deferrer) Wait(ctx context.Context, timeout time.Duration) error {
d.log.Info("Waiting for deferrer to complete task")
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

done := make(chan struct{})

go func() {
d.wg.Wait()
done <- struct{}{}
}()

select {
case <-done:
d.log.Info("Deferrer stopped")
return nil
case <-ctx.Done():
d.log.Info("Deferrer didn't stop in allocated time")
return ctx.Err()
}
}
55 changes: 55 additions & 0 deletions pkg/engine/deferrer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package engine

import (
"context"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/require"

"github.com/NVIDIA/knavigator/pkg/config"
)

type testExecutor struct {
log logr.Logger
}

func (exec *testExecutor) RunTask(_ context.Context, cfg *config.Task) error {
exec.log.Info("executing", "task", cfg.ID)
return nil
}

func TestDeferrer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

deferrer := NewDereffer(testLogger, &testExecutor{log: testLogger})
deferrer.Start(ctx)

deferrer.AddTask(&config.Task{ID: "t3"}, 3*time.Second)
deferrer.AddTask(&config.Task{ID: "t1"}, 1*time.Second)
deferrer.AddTask(&config.Task{ID: "t5"}, 5*time.Second)
deferrer.AddTask(&config.Task{ID: "t4"}, 4*time.Second)
deferrer.AddTask(&config.Task{ID: "t2"}, 2*time.Second)
deferrer.AddTask(&config.Task{ID: "t6"}, 6*time.Second)

err := deferrer.Wait(ctx, 8*time.Second)
require.NoError(t, err)
}
79 changes: 79 additions & 0 deletions pkg/utils/chrono_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
"container/heap"
"time"
)

type ChronoItem struct {
Value interface{}
Timestamp time.Time
}

// ChronoQueue is a priority queue that orders items based on their timestamp
type ChronoQueue []*ChronoItem

func NewChronoQueue() *ChronoQueue {
cq := &ChronoQueue{}
heap.Init(cq)
return cq
}

// Len returns the length of the heap
func (h ChronoQueue) Len() int { return len(h) }

// Less compares two items based on their timestamps
func (h ChronoQueue) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}

// Swap swaps two items in the heap
func (h ChronoQueue) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

// Push adds an item to the heap
func (h *ChronoQueue) Push(x interface{}) {
*h = append(*h, x.(*ChronoItem))
}

// Pop removes and returns the item with the earliest timestamp
func (h *ChronoQueue) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return x
}

// Add is a convenient wrapper for the Push method
func (h *ChronoQueue) Add(val interface{}, timestamp time.Time) {
heap.Push(h, &ChronoItem{Value: val, Timestamp: timestamp})
}

// Remove is a convenient wrapper for the Pop method
func (h *ChronoQueue) Remove() *ChronoItem {
return heap.Pop(h).(*ChronoItem)
}

// Peek returns the item with the earliest timestamp, but does not remove it
func (h *ChronoQueue) Peek() *ChronoItem {
return (*h)[0]
}
47 changes: 47 additions & 0 deletions pkg/utils/chrono_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestChronoQueue(t *testing.T) {
cq := NewChronoQueue()

now := time.Now()
cq.Add("t3", now.Add(3*time.Second))
cq.Add("t1", now.Add(1*time.Second))
cq.Add("t5", now.Add(5*time.Second))
cq.Add("t4", now.Add(4*time.Second))
cq.Add("t2", now.Add(2*time.Second))
cq.Add("t6", now.Add(6*time.Second))

expexted := []string{"t1", "t2", "t3", "t4", "t5", "t6"}
actual := make([]string, 0, len(expexted))

for cq.Len() > 0 {
item := cq.Peek()
require.Equal(t, item, cq.Remove())
actual = append(actual, item.Value.(string))
}

require.Equal(t, expexted, actual)
}

0 comments on commit ed0e929

Please sign in to comment.