diff --git a/examples/test_example/fluence-sized-job.yaml b/examples/test_example/fluence-sized-job.yaml new file mode 100644 index 0000000..a195d87 --- /dev/null +++ b/examples/test_example/fluence-sized-job.yaml @@ -0,0 +1,16 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: fluence-sized-job +spec: + parallelism: 3 + completions: 3 + template: + spec: + schedulerName: fluence + containers: + - name: fluence-job + image: busybox + command: [echo, potato] + restartPolicy: Never + backoffLimit: 4 diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index 72bda77..fa4593c 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -125,60 +125,81 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // to account for if the labels are different, do we take the smallest? log.Info("PodGroup", "Status", fmt.Sprintf("WARNING: Pod group current MinMember %s does not match %d", pg.Spec.MinMember, size)) } + return r.updateStatus(ctx, pg, podList.Items) - // If we get here, we have a PodGroup with a set size and can inspect / update phase - pods := podList.Items - pgCopy := pg.DeepCopy() +} +func (r *PodGroupReconciler) updateStatus( + ctx context.Context, + pg *schedv1alpha1.PodGroup, + pods []v1.Pod, +) (ctrl.Result, error) { - switch pgCopy.Status.Phase { + patch := client.MergeFrom(pg.DeepCopy()) + + switch pg.Status.Phase { case "": - pgCopy.Status.Phase = schedv1alpha1.PodGroupPending + pg.Status.Phase = schedv1alpha1.PodGroupPending + result, err := r.updateOwnerReferences(ctx, pg, &pods[0]) + if result.Requeue || err != nil { + return result, err + } + case schedv1alpha1.PodGroupPending: if len(pods) >= int(pg.Spec.MinMember) { - pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling - - // Always update owner references to be the first pod - // E.g., if a job owns it, ensure the group is deleted with it - updateOwnerReferences(pgCopy, &pods[0]) + pg.Status.Phase = schedv1alpha1.PodGroupScheduling + result, err := r.updateOwnerReferences(ctx, pg, &pods[0]) + if result.Requeue || err != nil { + return result, err + } } default: // Get updated counts of running, succeeded, and failed pods - pgCopy.Status.Running, pgCopy.Status.Succeeded, pgCopy.Status.Failed = getCurrentPodStats(pods) + running, succeeded, failed := getCurrentPodStats(pods) // If for some reason we weren't pending and now have fewer than min required, flip back to pending if len(pods) < int(pg.Spec.MinMember) { - pgCopy.Status.Phase = schedv1alpha1.PodGroupPending + pg.Status.Phase = schedv1alpha1.PodGroupPending break } // A pod with succeeded + running STILL less than the minimum required is scheduling - if pgCopy.Status.Succeeded+pgCopy.Status.Running < pg.Spec.MinMember { - pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling + if succeeded+running < pg.Spec.MinMember { + pg.Status.Phase = schedv1alpha1.PodGroupScheduling } // A pod with succeeded + running >= the minimum required is running! - if pgCopy.Status.Succeeded+pgCopy.Status.Running >= pg.Spec.MinMember { - pgCopy.Status.Phase = schedv1alpha1.PodGroupRunning + if succeeded+running >= pg.Spec.MinMember { + pg.Status.Phase = schedv1alpha1.PodGroupRunning } // We have non zero failed, and the total of failed, running amd succeeded > min member // Final state of pod group is FAILED womp womp - if pgCopy.Status.Failed != 0 && - pgCopy.Status.Failed+pgCopy.Status.Running+pgCopy.Status.Succeeded >= pg.Spec.MinMember { - pgCopy.Status.Phase = schedv1alpha1.PodGroupFailed + if failed != 0 && failed+running+succeeded >= pg.Spec.MinMember { + pg.Status.Phase = schedv1alpha1.PodGroupFailed } // Finished! This is where we want to get :) // TODO: ideally the owning higher level object deletion will delete here, // but that won't always work for one of pods - need a new strategy - if pgCopy.Status.Succeeded >= pg.Spec.MinMember { - pgCopy.Status.Phase = schedv1alpha1.PodGroupFinished + if succeeded >= pg.Spec.MinMember { + pg.Status.Phase = schedv1alpha1.PodGroupFinished } + pg.Status.Running = running + pg.Status.Failed = failed + pg.Status.Succeeded = succeeded } - // TODO need better handling here of cleanup, etc. This mostly handles status changes - return r.patchPodGroup(ctx, pg, pgCopy) + // Apply the patch to update, or delete if finished + // TODO would be better if owner references took here, so delete on owner deletion + var err error + if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed { + err = r.Delete(ctx, pg) + } else { + r.Status().Update(ctx, pg) + err = r.Patch(ctx, pg, patch) + } + return ctrl.Result{Requeue: true}, err } // newPodGroup creates a new podGroup object, capturing the creation time @@ -273,21 +294,37 @@ func getCurrentPodStats(pods []v1.Pod) (int32, int32, int32) { // updateOwnerReferences ensures the group is always owned by the same entity that owns the pod // This ensures that, for example, a job that is wrapping pods is the owner. -func updateOwnerReferences(pg *schedv1alpha1.PodGroup, pod *v1.Pod) { +func (r *PodGroupReconciler) updateOwnerReferences( + ctx context.Context, + pg *schedv1alpha1.PodGroup, + pod *v1.Pod, +) (ctrl.Result, error) { // Case 1: The pod itself doesn't have owner references. YOLO if len(pod.OwnerReferences) == 0 { - return + return ctrl.Result{}, nil } + // Collect owner references for pod group + owners := []metav1.OwnerReference{} var refs []string for _, ownerRef := range pod.OwnerReferences { refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name)) + owners = append(owners, ownerRef) } + patch := client.MergeFrom(pg.DeepCopy()) if len(refs) != 0 { sort.Strings(refs) pg.Status.OccupiedBy = strings.Join(refs, ",") } + if len(owners) > 0 { + pg.ObjectMeta.OwnerReferences = owners + } + // Apply the patch to update the size + r.Status().Update(ctx, pg) + err := r.Patch(ctx, pg, patch) + return ctrl.Result{Requeue: true}, err + } // SetupWithManager sets up the controller with the Manager. @@ -346,6 +383,8 @@ func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Obje if apierrs.IsNotFound(err) { r.log.Info("Pod: ", "Status", pod.Status.Phase, "Name", pod.Name, "Group", groupName, "Namespace", pod.Namespace, "Action", "Creating PodGroup") + //owner := r.getOwnerMetadata(pod) + // TODO should an owner be set here? Setting to a specific pod seems risky/wrong in case deleted. err, _ := r.newPodGroup(ctx, groupName, pod.Namespace, int32(groupSize)) if err != nil {