Skip to content

Commit

Permalink
fix: response to review comments
Browse files Browse the repository at this point in the history
This set of changes includes the following:

1. Renaming short variable names to be longer and more understandable.
2. Not using the Status.ScheduleStartTime for the pod start time, but instead
adding a new field. This previous field was there for a different purpose.
3. Creating named identifiers for resource types that can be shared in the
jgf module along with others that use the same relations / vertex types.
4. Removing comments that are not necessary.
5. Changing JGF types from int to int64 that warrant it.
6. Spelling mistakes, etc.
7. Removing need to write jobspec to temporary file (we just need string)

The JGF and utils modules need some additional looking - specifically I am worried
that the paths->containment is not set, and sometimes the name reflects the index
of the overall graph (global) and other times the index of the resource type. I
think we likely want the latter for the inner name, but I am not sure in practice
that fluxion is using it (internally). I am pushing these changes to assess testing,
etc., and will update the PR as needed. There could also have been changes to
upstream since the PR was opened that warrant additional fixes.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed May 14, 2024
1 parent 2aee785 commit 14d5652
Show file tree
Hide file tree
Showing 18 changed files with 263 additions and 334 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The way it works:
1. We have a mutating admission webhook that looks for jobs and pods, and ensures there are fluence labels (likely we will add more abstractions).
2. A PodGroup reconciler is watching for these same objects. When they are created:
a. We find the labels and create the pod group object.
b. The pod group object has a timestamp for creation in milliseconds.
b. The pod group object has a timestamp for creation in microseconds.
3. When the pod is then given to fluence for scheduling, it already has the PodGroup created with name/size and can properly sort.

Here is an example of a Job intended for Fluence:
Expand Down Expand Up @@ -452,7 +452,7 @@ If you are looking to develop:

- [src](src): includes source code for fluence. You'll find logs for this code in the `sidecar` container of the fluence pod.
- [sig-scheduler-plugins](sig-scheduler-plugins): includes assets (manifests and Go files) that are intended to be added to the kubernetes-sigs/scheduler-plugins upstream repository before build. You'll find logs for this container in the `scheduler-plugins-scheduler` container of the pod.
- [apis](sig-scheduler-plugins/apis): customized PodGroup to define the status scheduled time in micro seconds
- [apis](sig-scheduler-plugins/apis): customized PodGroup to define the status scheduled time in microseconds
- [manifests](sig-scheduler-plugins/manifests): manifests for helm and Kubernetes
- [pkg](sig-scheduler-plugins/pkg): the main fluence module to add to upstream
- [cmd](sig-scheduler-plugins/cmd): the main.go to replace in upstream
Expand Down
49 changes: 24 additions & 25 deletions sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 Lawrence Livermore National Security, LLC
Copyright 2024 Lawrence Livermore National Security, LLC
(c.f. AUTHORS, NOTICE.LLNS, COPYING)
SPDX-License-Identifier: MIT
Expand Down Expand Up @@ -50,14 +50,14 @@ type fluenceWatcher struct {
// Handle is the main handler for the webhook, which is looking for jobs and pods (in that order)
// If a job comes in (with a pod template) first, we add the labels there first (and they will
// not be added again).
func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response {
func (hook *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response {

logger.Info("Running webhook handle, determining pod wrapper abstraction...")

job := &batchv1.Job{}
err := a.decoder.Decode(req, job)
err := hook.decoder.Decode(req, job)
if err == nil {
err = a.EnsureGroupOnJob(job)
err = hook.EnsureGroupOnJob(job)
if err != nil {
logger.Error(err, "Issue adding PodGroup to Job")
return admission.Errored(http.StatusBadRequest, err)
Expand All @@ -72,9 +72,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi
}

pod := &corev1.Pod{}
err = a.decoder.Decode(req, pod)
err = hook.decoder.Decode(req, pod)
if err == nil {
err = a.EnsureGroup(pod)
err = hook.EnsureGroup(pod)
if err != nil {
logger.Error(err, "Issue adding PodGroup to Pod")
return admission.Errored(http.StatusBadRequest, err)
Expand All @@ -89,9 +89,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi
}

set := &appsv1.StatefulSet{}
err = a.decoder.Decode(req, set)
err = hook.decoder.Decode(req, set)
if err == nil {
err = a.EnsureGroupStatefulSet(set)
err = hook.EnsureGroupStatefulSet(set)
if err != nil {
logger.Error(err, "Issue adding PodGroup to StatefulSet")
return admission.Errored(http.StatusBadRequest, err)
Expand All @@ -105,15 +105,15 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledSet)
}

d := &appsv1.Deployment{}
err = a.decoder.Decode(req, d)
deployment := &appsv1.Deployment{}
err = hook.decoder.Decode(req, deployment)
if err == nil {
err = a.EnsureGroupDeployment(d)
err = hook.EnsureGroupDeployment(deployment)
if err != nil {
logger.Error(err, "Issue adding PodGroup to Deployment")
return admission.Errored(http.StatusBadRequest, err)
}
marshalledD, err := json.Marshal(d)
marshalledD, err := json.Marshal(deployment)
if err != nil {
logger.Error(err, "Marshalling Deployment error")
return admission.Errored(http.StatusInternalServerError, err)
Expand All @@ -123,9 +123,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi
}

rset := &appsv1.ReplicaSet{}
err = a.decoder.Decode(req, rset)
err = hook.decoder.Decode(req, rset)
if err == nil {
err = a.EnsureGroupReplicaSet(rset)
err = hook.EnsureGroupReplicaSet(rset)
if err != nil {
logger.Error(err, "Issue adding PodGroup to ReplicaSet")
return admission.Errored(http.StatusBadRequest, err)
Expand All @@ -145,29 +145,28 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi
}

// Default is the expected entrypoint for a webhook...
// I don't remember if this is even called...
func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error {
func (hook *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error {

switch obj.(type) {
case *batchv1.Job:
job := obj.(*batchv1.Job)
return a.EnsureGroupOnJob(job)
return hook.EnsureGroupOnJob(job)

case *corev1.Pod:
pod := obj.(*corev1.Pod)
return a.EnsureGroup(pod)
return hook.EnsureGroup(pod)

case *appsv1.StatefulSet:
set := obj.(*appsv1.StatefulSet)
return a.EnsureGroupStatefulSet(set)
return hook.EnsureGroupStatefulSet(set)

case *appsv1.Deployment:
d := obj.(*appsv1.Deployment)
return a.EnsureGroupDeployment(d)
deployment := obj.(*appsv1.Deployment)
return hook.EnsureGroupDeployment(deployment)

case *appsv1.ReplicaSet:
set := obj.(*appsv1.ReplicaSet)
return a.EnsureGroupReplicaSet(set)
return hook.EnsureGroupReplicaSet(set)

default:
// no match
Expand All @@ -180,7 +179,7 @@ func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error
// Note that we need to do similar for Job.
// A pod without a job wrapper, and without metadata is a group
// of size 1.
func (a *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error {
func (hook *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error {

// Add labels if we don't have anything. Everything is a group!
if pod.Labels == nil {
Expand Down Expand Up @@ -221,7 +220,7 @@ func getJobLabel(job *batchv1.Job, labelName, defaultLabel string) string {

// EnsureGroupOnJob looks for fluence labels (size and name) on both the job
// and the pod template. We ultimately put on the pod, the lowest level unit.
// Since we have the size of the job (paramllism) we can use that for the size
// Since we have the size of the job (parallelism) we can use that for the size
func (a *fluenceWatcher) EnsureGroupOnJob(job *batchv1.Job) error {

// Be forgiving - allow the person to specify it on the job directly or on the Podtemplate
Expand Down Expand Up @@ -252,7 +251,7 @@ func (a *fluenceWatcher) EnsureGroupOnJob(job *batchv1.Job) error {
}

// EnsureGroupStatefulSet creates a PodGroup for a StatefulSet
func (a *fluenceWatcher) EnsureGroupStatefulSet(set *appsv1.StatefulSet) error {
func (hook *fluenceWatcher) EnsureGroupStatefulSet(set *appsv1.StatefulSet) error {

// StatefulSet requires on top level explicitly
if set.Labels == nil {
Expand Down
12 changes: 9 additions & 3 deletions sig-scheduler-plugins/apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ type PodGroup struct {
type PodGroupSpec struct {
// MinMember defines the minimal number of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
// will not start anyway.
MinMember int32 `json:"minMember,omitempty"`

// MinResources defines the minimal resource of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
// will not start anyway.
MinResources v1.ResourceList `json:"minResources,omitempty"`

// ScheduleTimeoutSeconds defines the maximal time of members/tasks to wait before run the pod group;
Expand Down Expand Up @@ -169,7 +169,13 @@ type PodGroupStatus struct {
// +optional
Failed int32 `json:"failed,omitempty"`

// ScheduleStartTime of the group (note that we changed this to a micro time)
// CreationTime is intended to mock the object CreationTime,
// but set by us to be MicroTime instead of Time.
// +optional
CreationTime metav1.MicroTime `json:"creationTime,omitempty"`

// ScheduleStartTime of the group is when we want to start counting
// "at time N plus 48 hours, this is when we deem time waited is too long"
// +optional
ScheduleStartTime metav1.MicroTime `json:"scheduleStartTime,omitempty"`
}
Expand Down
4 changes: 0 additions & 4 deletions sig-scheduler-plugins/cmd/controller/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func Run(s *ServerRunOptions) error {
return err
}

// Create a channel for the mutating webhook to communicate back to the reconciler
// This way we create the PodGroup before scheduling
//c := make(chan event.GenericEvent)

if err = (&controllers.PodGroupReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ spec:
selector:
app: scheduler-plugins-controller
ports:
{{- .Values.webhookService.ports | toYaml | nindent 2 -}}
{{- .Values.webhookService.ports | toYaml | nindent 2 -}}
10 changes: 7 additions & 3 deletions sig-scheduler-plugins/pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences)

// Grab all statuses (and groups of them) we are interested in
// Note that 48 hours seems arbitrary, and if it is, we might make it a variable
schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending)
twoDaysOld := pg.Status.ScheduleStartTime.Sub(pg.CreationTimestamp.Time) > 48*time.Hour
finishedOrFailed := pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed
Expand Down Expand Up @@ -111,8 +112,11 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

// If the scheduler time created is Zero (not set) we set it here
if pg.Status.ScheduleStartTime.IsZero() {
// If the pod group creation time created is Zero (not set) we set it here
// This only happens on the first reconcile, which should also be when the
// pod group is created. We set it here and don't use the underlying object
// CreationTime because we need to change the granularity to ms.
if pg.Status.CreationTime.IsZero() {
return r.setTimeCreated(ctx, pg, podList.Items, timestamp)
}

Expand Down Expand Up @@ -159,7 +163,7 @@ func (r *PodGroupReconciler) setTimeCreated(

// Now patch to update it
patch := client.MergeFrom(pg.DeepCopy())
pg.Status.ScheduleStartTime = timestamp
pg.Status.CreationTime = timestamp

// Apply the patch to update the size
r.Status().Update(ctx, pg)
Expand Down
30 changes: 20 additions & 10 deletions sig-scheduler-plugins/pkg/fluence/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Manager interface {
PreFilter(context.Context, *corev1.Pod, *framework.CycleState) error
GetPodNode(*corev1.Pod) string
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
GetCreationTimestamp(*corev1.Pod, time.Time) metav1.MicroTime
DeletePermittedPodGroup(string)
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
CalculateAssignedPods(string, string) int
Expand Down Expand Up @@ -255,8 +255,8 @@ func (podGroupManager *PodGroupManager) PreFilter(
return nil
}

_, exist := podGroupManager.backedOffpodGroup.Get(groupName)
if exist {
_, exists := podGroupManager.backedOffpodGroup.Get(groupName)
if exists {
return fmt.Errorf("podGroup %v failed recently", groupName)
}

Expand Down Expand Up @@ -290,8 +290,8 @@ func (podGroupManager *PodGroupManager) PreFilter(
// TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group.
// It only tries to PreFilter resource constraints so even if a PodGroup passed here,
// it may not necessarily pass Filter due to other constraints such as affinity/taints.
_, ok := podGroupManager.permittedpodGroup.Get(groupName)
if ok {
_, exists = podGroupManager.permittedpodGroup.Get(groupName)
if exists {
podGroupManager.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", groupName)
return nil
}
Expand Down Expand Up @@ -331,17 +331,27 @@ func (podGroupManager *PodGroupManager) PreFilter(
return nil
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime)
// The Status.CreationTime is set by the PodGroup reconciler, which has to happen before we have
// a PodGroup. I don't see cases when this wouldn't happen, but in case we fall back to
// converting the pg.CreationTime to a MicroTime
func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) metav1.MicroTime {
groupName := util.GetPodGroupLabel(pod)
if len(groupName) == 0 {
return ts
return metav1.NewMicroTime(ts)
}
var podGroup v1alpha1.PodGroup
if err := podGroupManager.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil {
return ts
return metav1.NewMicroTime(ts)
}
return podGroup.CreationTimestamp.Time
// First preference goes to microseconds. This should be set, as it is set by the first
// reconcile, and we wouldn'thave a pod group if it didn't pass through that.
if !podGroup.Status.CreationTime.IsZero() {
return podGroup.Status.CreationTime
}
// Fall back to CreationTime from Kubernetes, in seconds
// In practice this should not happen
return metav1.NewMicroTime(podGroup.CreationTimestamp.Time)
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
Expand Down
Loading

0 comments on commit 14d5652

Please sign in to comment.