Skip to content

Commit

Permalink
feat: podgroup deletion when finished/failed
Browse files Browse the repository at this point in the history
Problem: Since the PodGroup controller creates the PodGroup, it
should delete it as well.
Solution: Ideally I wanted to attach an owner reference, meaning
that the top level job (that also owns the pod) would be owner
to the PodGroup. But that does not seem to take - either because
the controller is the owner or the field is read only for k8s.
For the time being, I decided to delete the PodGroup when the
group is determined to be Finished/Failed, which happens when
that number of pods equals or exceeds the MinimumSize. I think
granted that MinimumSize == size this should be OK with fluence,
and we might need to consider other approaches if/when the
min size is smaller than the total size (because fluence might
still see a pod in the queue and try to schedule again. I think
what we might do in that case is just update the MinSize for
the group, so if fluence schedules again it will be for the
smaller size. But not sure about that either! TBA. The
important thing now is that the pod group cleans itself up!

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Feb 19, 2024
1 parent 962d035 commit 84f3697
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 25 deletions.
16 changes: 16 additions & 0 deletions examples/test_example/fluence-sized-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: batch/v1
kind: Job
metadata:
name: fluence-sized-job
spec:
parallelism: 3
completions: 3
template:
spec:
schedulerName: fluence
containers:
- name: fluence-job
image: busybox
command: [echo, potato]
restartPolicy: Never
backoffLimit: 4
89 changes: 64 additions & 25 deletions sig-scheduler-plugins/pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,60 +125,81 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// to account for if the labels are different, do we take the smallest?
log.Info("PodGroup", "Status", fmt.Sprintf("WARNING: Pod group current MinMember %s does not match %d", pg.Spec.MinMember, size))
}
return r.updateStatus(ctx, pg, podList.Items)

// If we get here, we have a PodGroup with a set size and can inspect / update phase
pods := podList.Items
pgCopy := pg.DeepCopy()
}
func (r *PodGroupReconciler) updateStatus(
ctx context.Context,
pg *schedv1alpha1.PodGroup,
pods []v1.Pod,
) (ctrl.Result, error) {

switch pgCopy.Status.Phase {
patch := client.MergeFrom(pg.DeepCopy())

switch pg.Status.Phase {
case "":
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
pg.Status.Phase = schedv1alpha1.PodGroupPending
result, err := r.updateOwnerReferences(ctx, pg, &pods[0])
if result.Requeue || err != nil {
return result, err
}

case schedv1alpha1.PodGroupPending:
if len(pods) >= int(pg.Spec.MinMember) {
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling

// Always update owner references to be the first pod
// E.g., if a job owns it, ensure the group is deleted with it
updateOwnerReferences(pgCopy, &pods[0])
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
result, err := r.updateOwnerReferences(ctx, pg, &pods[0])
if result.Requeue || err != nil {
return result, err
}
}
default:

// Get updated counts of running, succeeded, and failed pods
pgCopy.Status.Running, pgCopy.Status.Succeeded, pgCopy.Status.Failed = getCurrentPodStats(pods)
running, succeeded, failed := getCurrentPodStats(pods)

// If for some reason we weren't pending and now have fewer than min required, flip back to pending
if len(pods) < int(pg.Spec.MinMember) {
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
pg.Status.Phase = schedv1alpha1.PodGroupPending
break
}

// A pod with succeeded + running STILL less than the minimum required is scheduling
if pgCopy.Status.Succeeded+pgCopy.Status.Running < pg.Spec.MinMember {
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling
if succeeded+running < pg.Spec.MinMember {
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
}

// A pod with succeeded + running >= the minimum required is running!
if pgCopy.Status.Succeeded+pgCopy.Status.Running >= pg.Spec.MinMember {
pgCopy.Status.Phase = schedv1alpha1.PodGroupRunning
if succeeded+running >= pg.Spec.MinMember {
pg.Status.Phase = schedv1alpha1.PodGroupRunning
}

// We have non zero failed, and the total of failed, running amd succeeded > min member
// Final state of pod group is FAILED womp womp
if pgCopy.Status.Failed != 0 &&
pgCopy.Status.Failed+pgCopy.Status.Running+pgCopy.Status.Succeeded >= pg.Spec.MinMember {
pgCopy.Status.Phase = schedv1alpha1.PodGroupFailed
if failed != 0 && failed+running+succeeded >= pg.Spec.MinMember {
pg.Status.Phase = schedv1alpha1.PodGroupFailed
}

// Finished! This is where we want to get :)
// TODO: ideally the owning higher level object deletion will delete here,
// but that won't always work for one of pods - need a new strategy
if pgCopy.Status.Succeeded >= pg.Spec.MinMember {
pgCopy.Status.Phase = schedv1alpha1.PodGroupFinished
if succeeded >= pg.Spec.MinMember {
pg.Status.Phase = schedv1alpha1.PodGroupFinished
}
pg.Status.Running = running
pg.Status.Failed = failed
pg.Status.Succeeded = succeeded
}

// TODO need better handling here of cleanup, etc. This mostly handles status changes
return r.patchPodGroup(ctx, pg, pgCopy)
// Apply the patch to update, or delete if finished
// TODO would be better if owner references took here, so delete on owner deletion
var err error
if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed {
err = r.Delete(ctx, pg)
} else {
r.Status().Update(ctx, pg)
err = r.Patch(ctx, pg, patch)
}
return ctrl.Result{Requeue: true}, err
}

// newPodGroup creates a new podGroup object, capturing the creation time
Expand Down Expand Up @@ -273,21 +294,37 @@ func getCurrentPodStats(pods []v1.Pod) (int32, int32, int32) {

// updateOwnerReferences ensures the group is always owned by the same entity that owns the pod
// This ensures that, for example, a job that is wrapping pods is the owner.
func updateOwnerReferences(pg *schedv1alpha1.PodGroup, pod *v1.Pod) {
func (r *PodGroupReconciler) updateOwnerReferences(
ctx context.Context,
pg *schedv1alpha1.PodGroup,
pod *v1.Pod,
) (ctrl.Result, error) {

// Case 1: The pod itself doesn't have owner references. YOLO
if len(pod.OwnerReferences) == 0 {
return
return ctrl.Result{}, nil
}

// Collect owner references for pod group
owners := []metav1.OwnerReference{}
var refs []string
for _, ownerRef := range pod.OwnerReferences {
refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name))
owners = append(owners, ownerRef)
}
patch := client.MergeFrom(pg.DeepCopy())
if len(refs) != 0 {
sort.Strings(refs)
pg.Status.OccupiedBy = strings.Join(refs, ",")
}
if len(owners) > 0 {
pg.ObjectMeta.OwnerReferences = owners
}
// Apply the patch to update the size
r.Status().Update(ctx, pg)
err := r.Patch(ctx, pg, patch)
return ctrl.Result{Requeue: true}, err

}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -346,6 +383,8 @@ func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Obje
if apierrs.IsNotFound(err) {
r.log.Info("Pod: ", "Status", pod.Status.Phase, "Name", pod.Name, "Group", groupName, "Namespace", pod.Namespace, "Action", "Creating PodGroup")

//owner := r.getOwnerMetadata(pod)

// TODO should an owner be set here? Setting to a specific pod seems risky/wrong in case deleted.
err, _ := r.newPodGroup(ctx, groupName, pod.Namespace, int32(groupSize))
if err != nil {
Expand Down

0 comments on commit 84f3697

Please sign in to comment.