Skip to content

Commit

Permalink
Merge pull request #13 from konnase/feat/job-create-event
Browse files Browse the repository at this point in the history
Feat/job create event
  • Loading branch information
EastInsure authored Oct 12, 2021
2 parents 0982f17 + a7889c3 commit c261fc6
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name: Release
on: [push]

env:
version: v0.2.0
version: v0.2.1

jobs:
docker:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

# di-operator version
VERSION ?= v0.2.0
VERSION ?= v0.2.1
MASTER_VERSION := $(VERSION)

COMMIT_SHORT_SHA=$(shell git log -n 1 | head -n 1 | sed -e 's/^commit //' | head -c 8)
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/dijob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ type DIJobSpec struct {
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`

// Coordinator defines the coordinator of distributed DIJob.
// For serial DIJob, only coordinator is needed.
// +kubebuilder:validation:Required
Coordinator CoordinatorSpec `json:"coordinator"`

// +kubebuilder:validation:Optional
Collector CollectorSpec `json:"collector,"`

// +kubebuilder:validation:Optional
Learner LearnerSpec `json:"learner,"`
}

Expand Down
5 changes: 2 additions & 3 deletions config/crd/bases/diengine.opendilab.org_dijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6614,7 +6614,8 @@ spec:
- template
type: object
coordinator:
description: CoordinatorSpec defines the desired state of coordinators
description: Coordinator defines the coordinator of distributed DIJob.
For serial DIJob, only coordinator is needed.
properties:
template:
description: PodTemplateSpec describes the data a pod should have
Expand Down Expand Up @@ -21179,9 +21180,7 @@ spec:
type: object
type: array
required:
- collector
- coordinator
- learner
type: object
status:
description: DIJobStatus defines the observed state of DIJob
Expand Down
10 changes: 4 additions & 6 deletions config/di-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7617,7 +7617,7 @@ spec:
- template
type: object
coordinator:
description: CoordinatorSpec defines the desired state of coordinators
description: Coordinator defines the coordinator of distributed DIJob. For serial DIJob, only coordinator is needed.
properties:
template:
description: PodTemplateSpec describes the data a pod should have when created from a template
Expand Down Expand Up @@ -16018,9 +16018,7 @@ spec:
type: object
type: array
required:
- collector
- coordinator
- learner
type: object
status:
description: DIJobStatus defines the observed state of DIJob
Expand Down Expand Up @@ -16321,7 +16319,7 @@ spec:
- --leader-elect
command:
- /di-operator
image: opendilab/di-operator:v0.2.0
image: opendilab/di-operator:v0.2.1
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down Expand Up @@ -16372,7 +16370,7 @@ spec:
- --lease-lock-name=di-server
command:
- /di-server
image: opendilab/di-server:v0.2.0
image: opendilab/di-server:v0.2.1
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down Expand Up @@ -16412,7 +16410,7 @@ spec:
containers:
- command:
- /di-webhook
image: opendilab/di-webhook:v0.2.0
image: opendilab/di-webhook:v0.2.1
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down
2 changes: 1 addition & 1 deletion config/manager/di_operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
- /di-operator
args:
- "--server-address=http://di-server.di-system:8080"
image: opendilab/di-operator:v0.2.0
image: opendilab/di-operator:v0.2.1
imagePullPolicy: Always
name: manager
securityContext:
Expand Down
2 changes: 1 addition & 1 deletion config/manager/di_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spec:
- "--leader-elect"
- "--lease-lock-namespace=di-system"
- "--lease-lock-name=di-server"
image: opendilab/di-server:v0.2.0
image: opendilab/di-server:v0.2.1
imagePullPolicy: Always
name: server
securityContext:
Expand Down
2 changes: 1 addition & 1 deletion config/manager/di_webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
containers:
- command:
- /di-webhook
image: opendilab/di-webhook:v0.2.0
image: opendilab/di-webhook:v0.2.1
imagePullPolicy: Always
name: webhook
securityContext:
Expand Down
6 changes: 3 additions & 3 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ kind: Kustomization
images:
- name: opendilab/di-operator
newName: opendilab/di-operator
newTag: v0.2.0
newTag: v0.2.1
- name: opendilab/di-server
newName: opendilab/di-server
newTag: v0.2.0
newTag: v0.2.1
- name: opendilab/di-webhook
newName: opendilab/di-webhook
newTag: v0.2.0
newTag: v0.2.1
98 changes: 93 additions & 5 deletions controllers/dijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
Expand Down Expand Up @@ -56,14 +65,93 @@ func (r *DIJobReconciler) reconcileReplicas(ctx context.Context, job *div1alpha1
if err := r.createPodAndService(ctx, job, coorpod, coorsvc); err != nil {
return err
}
}
return nil
}

// update job status
msg := fmt.Sprintf("DIJob %s created", job.Name)
if err := r.updateJobPhase(ctx, job, div1alpha1.JobCreated, DIJobCreatedReason, msg); err != nil {
return err
// addDIJob is the event handler responsible for handling job add events
func (r *DIJobReconciler) addDIJob(obj client.Object) {
log := r.Log.WithValues("dijob", diutil.NamespacedName(obj.GetNamespace(), obj.GetName()))
job, ok := obj.(*div1alpha1.DIJob)
if !ok {
log.Error(fmt.Errorf("failed to convert object DIJob: %s/%s", obj.GetNamespace(), obj.GetName()), "")
r.markIncorrectJobFailed(obj)
return
}

oldStatus := job.Status.DeepCopy()
// update job status
msg := fmt.Sprintf("DIJob %s created", job.Name)
if err := r.updateJobPhase(context.Background(), job, div1alpha1.JobCreated, DIJobCreatedReason, msg); err != nil {
log.Error(err, "failed to update job status")
return
}

log.Info(fmt.Sprintf("DIJob %s/%s created", job.Namespace, job.Name))

if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := r.updateDIJobStatusInCluster(context.Background(), job); err != nil {
log.Error(err, fmt.Sprintf("failed to update DIJob %s/%s status", job.Namespace, job.Name))
}
}
return nil
}

func (r *DIJobReconciler) markIncorrectJobFailed(obj client.Object) {
log := r.Log.WithValues("dijob", diutil.NamespacedName(obj.GetNamespace(), obj.GetName()))

// create dynamic client
config := ctrl.GetConfigOrDie()
dclient, err := dynamic.NewForConfig(config)
if err != nil {
log.Error(err, "failed to create dynamic client")
return
}

// dynamic client for dijobs
dijobRes := schema.GroupVersionResource{
Group: div1alpha1.GroupVersion.Group,
Version: div1alpha1.GroupVersion.Version,
Resource: "dijobs",
}

// build status
failedConvertDIJob := fmt.Sprintf("failed to convert type %T to v1alpha1.DIJob", obj)
status := div1alpha1.DIJobStatus{
Phase: div1alpha1.JobFailed,
Conditions: []div1alpha1.DIJobCondition{
{
Type: div1alpha1.JobFailed,
Status: corev1.ConditionTrue,
Message: failedConvertDIJob,
},
},
}

statusMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&status)
if err != nil {
log.Error(err, "failed to convert status to unstructured")
return
}

// get dijob
un, err := dclient.Resource(dijobRes).Namespace(obj.GetNamespace()).Get(context.Background(), obj.GetName(), metav1.GetOptions{})
if err != nil {
log.Error(err, "failed to get dijob")
}

// set and update status
unstructured.SetNestedField(un.Object, statusMap, "status")

var updateErr error
for i := 0; i < statusUpdateRetries; i++ {
_, updateErr = dclient.Resource(dijobRes).Namespace(obj.GetNamespace()).UpdateStatus(context.Background(), un, metav1.UpdateOptions{})
if updateErr == nil {
break
}
}
if updateErr != nil {
log.Error(updateErr, "failed to update job status")
}
}

func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *div1alpha1.DIJob,
Expand Down
30 changes: 30 additions & 0 deletions controllers/dijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand Down Expand Up @@ -182,6 +184,13 @@ func (r *DIJobReconciler) deletePodsAndServices(ctx context.Context, job *div1al
func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&div1alpha1.DIJob{}).
Watches(
&source.Kind{Type: &div1alpha1.DIJob{}},
&DIJobEventHandler{
r,
},
builder.Predicates{},
).
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestForOwner{
Expand All @@ -199,3 +208,24 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
).
Complete(r)
}

type DIJobEventHandler struct {
r *DIJobReconciler
}

// Create implements EventHandler
func (e *DIJobEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
e.r.addDIJob(evt.Object)
}

// Update implements EventHandler
func (e *DIJobEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
}

// Delete implements EventHandler
func (e *DIJobEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
}

// Generic implements EventHandler
func (e *DIJobEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
38 changes: 34 additions & 4 deletions controllers/dijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ var _ = Describe("DIJob Controller", func() {
ctx := context.Background()
jobTmpl := testutil.NewDIJob()
dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
checkCoordinatorCreated(ctx, dijob)

By("Checking the created DIJob is in Created state")
checkDIJobPhase(ctx, k8sClient, jobKey, div1alpha1.JobCreated)

replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}

By("Update coordinator to Running")
err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodRunning)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -84,6 +88,8 @@ var _ = Describe("DIJob Controller", func() {
ctx := context.Background()
jobTmpl := testutil.NewDIJob()
dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
checkCoordinatorCreated(ctx, dijob)

replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}

Expand Down Expand Up @@ -114,6 +120,25 @@ var _ = Describe("DIJob Controller", func() {
Expect(err).NotTo(HaveOccurred())
}
})

It("Should be marked as Created when submitted", func() {
By("Create a DIJob")
var err error
ctx := context.Background()
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits = make(corev1.ResourceList)
jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] =
resource.MustParse("1m")

dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)

By("Checking the created DIJob is in Created state")
checkDIJobPhase(ctx, k8sClient, jobKey, div1alpha1.JobCreated)

By("Cleaning up")
err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
Expect(err).NotTo(HaveOccurred())
})
})
Context("When creating a DIJob with collectors and learners", func() {
It("Should record collector and learner status to job status", func() {
Expand Down Expand Up @@ -161,6 +186,8 @@ var _ = Describe("DIJob Controller", func() {
ctx := context.Background()
jobTmpl := testutil.NewDIJob()
dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
checkCoordinatorCreated(ctx, dijob)

replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}

Expand Down Expand Up @@ -245,6 +272,7 @@ var _ = Describe("DIJob Controller", func() {
ctx := context.Background()
jobTmpl := testutil.NewDIJob()
dijob, _ := createDIJob(ctx, k8sClient, jobTmpl)
checkCoordinatorCreated(ctx, dijob)

// build owner reference
ownRefer := diutil.NewOwnerReference(div1alpha1.GroupVersion.String(), div1alpha1.KindDIJob, dijob.Name, dijob.UID, true)
Expand Down Expand Up @@ -294,16 +322,18 @@ func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div1alpha1
return err == nil
}, timeout, interval).Should(BeTrue())

return createdDIjob, key
}

func checkCoordinatorCreated(ctx context.Context, dijob div1alpha1.DIJob) {
By("Checking coordinator are created")
replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
var pod corev1.Pod
podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
Eventually(func() bool {
err = k8sClient.Get(ctx, podKey, &pod)
err := k8sClient.Get(ctx, podKey, &pod)
return err == nil
}, timeout, interval).Should(BeTrue())

return createdDIjob, key
}

func createAndUpdatePodPhase(
Expand Down
Loading

0 comments on commit c261fc6

Please sign in to comment.