Skip to content

Commit

Permalink
Merge pull request #19011 from zexi/automated-cherry-pick-of-#19007-u…
Browse files Browse the repository at this point in the history
…pstream-master

Automated cherry pick of #19007: fix(scheduler): reload cloudprovider's hosts when syncing finished
  • Loading branch information
zexi authored Dec 16, 2023
2 parents d478a8a + 793a73c commit 2e7e80b
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 17 deletions.
13 changes: 11 additions & 2 deletions pkg/cloudcommon/db/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,11 @@ func FetchStandaloneObjectsByIds(modelManager IModelManager, ids []string, targe
return FetchModelObjectsByIds(modelManager, "id", ids, targets)
}

func FetchDistinctField(modelManager IModelManager, field string) ([]string, error) {
q := modelManager.Query(field).Distinct()
func FetchField(modelMan IModelManager, field string, qCallback func(q *sqlchemy.SQuery) *sqlchemy.SQuery) ([]string, error) {
q := modelMan.Query(field)
if qCallback != nil {
q = qCallback(q)
}
rows, err := q.Rows()
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
Expand All @@ -593,3 +596,9 @@ func FetchDistinctField(modelManager IModelManager, field string) ([]string, err
}
return values, nil
}

func FetchDistinctField(modelManager IModelManager, field string) ([]string, error) {
return FetchField(modelManager, field, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Distinct()
})
}
2 changes: 2 additions & 0 deletions pkg/compute/models/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4664,6 +4664,7 @@ func (h *SHost) addNetif(ctx context.Context, userCred mcclient.TokenCredential,
}
// else not found
netif = &SNetInterface{}
netif.SetModelManager(NetInterfaceManager, netif)
netif.Mac = mac
netif.VlanId = vlanId
}
Expand Down Expand Up @@ -5003,6 +5004,7 @@ func (hh *SHost) Attach2Network(
}
bn := &SHostnetwork{}
bn.BaremetalId = hh.Id
bn.SetModelManager(HostnetworkManager, bn)
bn.NetworkId = net.Id
bn.IpAddr = freeIp
bn.MacAddr = netif.Mac
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/candidate/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (b *baseBuilder) setCloudproviderAccounts(hosts []computemodels.SHost, errC
}
providerObjs := make([]computemodels.SCloudprovider, 0)
for _, pId := range providerSets.List() {
pObj, ok := cloudprovider.Manager.GetResource(pId)
pObj, ok := cloudprovider.GetManager().GetResource(pId)
if !ok {
errCh <- errors.Errorf("Not found cloudprovider by id: %q", pId)
return
Expand Down
16 changes: 12 additions & 4 deletions pkg/scheduler/data_manager/candidate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ func (cm *CandidateManager) AddImpl(name string, impl *CandidateManagerImpl) {
cm.impls[name] = impl
}

const (
CANDIDATE_MANAGER_IMPL_HOST = "host"
CANDIDATE_MANAGER_IMPL_BAREMETAL = "baremetal"
)

func NewCandidateManager(dataManager *DataManager, stopCh <-chan struct{}) *CandidateManager {

candidateManager := &CandidateManager{
Expand All @@ -331,10 +336,10 @@ func NewCandidateManager(dataManager *DataManager, stopCh <-chan struct{}) *Cand
//dirtyPool: ttlpool.NewCountPool(),
}

candidateManager.AddImpl("host", NewCandidateManagerImpl(
candidateManager.AddImpl(CANDIDATE_MANAGER_IMPL_HOST, NewCandidateManagerImpl(
&HostCandidateManagerImplProvider{dataManager: dataManager}, stopCh))

candidateManager.AddImpl("baremetal", NewCandidateManagerImpl(
candidateManager.AddImpl(CANDIDATE_MANAGER_IMPL_BAREMETAL, NewCandidateManagerImpl(
&BaremetalCandidateManagerImplProvider{dataManager: dataManager}, stopCh))

return candidateManager
Expand All @@ -347,8 +352,11 @@ func (cm *CandidateManager) Run() {
}
}

func (cm *CandidateManager) Reload(resType string, candidateIds []string) (
[]interface{}, error) {
func (cm *CandidateManager) ReloadHosts(ids []string) ([]interface{}, error) {
return cm.Reload(CANDIDATE_MANAGER_IMPL_HOST, ids)
}

func (cm *CandidateManager) Reload(resType string, candidateIds []string) ([]interface{}, error) {

if len(candidateIds) == 0 {
return []interface{}{}, nil
Expand Down
51 changes: 47 additions & 4 deletions pkg/scheduler/data_manager/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
package cloudprovider

import (
"fmt"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/sqlchemy"

computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/mcclient/modules/compute"
"yunion.io/x/onecloud/pkg/scheduler/data_manager/common"
)

var Manager common.IResourceManager[models.SCloudprovider]
var manager common.IResourceManager[models.SCloudprovider]

func init() {
Manager = NewResourceManager()
func GetManager() common.IResourceManager[models.SCloudprovider] {
if manager != nil {
return manager
}
manager = NewResourceManager()
return manager
}

func NewResourceManager() common.IResourceManager[models.SCloudprovider] {
Expand All @@ -41,5 +53,36 @@ func NewResourceStore() common.IResourceStore[models.SCloudprovider] {
return common.NewResourceStore[models.SCloudprovider](
models.CloudproviderManager,
compute.Cloudproviders,
)
).WithOnUpdate(onCloudproviderUpdate)
}

func onCloudproviderUpdate(oldObj *jsonutils.JSONDict, newObj db.IModel) {
syncStatusKey := "sync_status"
if !oldObj.Contains(syncStatusKey) {
return
}
// process cloudaccount syncing finished status
cp := newObj.(*models.SCloudprovider)
prevStatus, _ := oldObj.GetString(syncStatusKey)
curStatus := cp.SyncStatus
if prevStatus == computeapi.CLOUD_PROVIDER_SYNC_STATUS_SYNCING && curStatus == computeapi.CLOUD_PROVIDER_SYNC_STATUS_IDLE {
if err := onCloudproviderSyncFinished(cp); err != nil {
log.Infof("onCloudproviderSyncFinished error: %v", err)
}
}
}

func onCloudproviderSyncFinished(cp *models.SCloudprovider) error {
cpHint := fmt.Sprintf("%s/%s", cp.GetId(), cp.GetName())
hostdIds, err := db.FetchField(models.HostManager, "id", func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", cp.GetId())
})
if err != nil {
return errors.Wrapf(err, "get all hostdIds from cloudprovider %s", cpHint)
}
log.Infof("Start reload cloudprovider %s hosts: %v", cpHint, hostdIds)
if _, err := common.GetCacheManager().ReloadHosts(hostdIds); err != nil {
return errors.Wrapf(err, "Reload cache hosts of cloudprovider %s", cpHint)
}
return nil
}
15 changes: 15 additions & 0 deletions pkg/scheduler/data_manager/common/cache_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package common

var cacheManager CacheManager

type CacheManager interface {
ReloadHosts(ids []string) ([]interface{}, error)
}

func RegisterCacheManager(man CacheManager) {
cacheManager = man
}

func GetCacheManager() CacheManager {
return cacheManager
}
36 changes: 33 additions & 3 deletions pkg/scheduler/data_manager/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,15 @@ type ResourceStore[O lockman.ILockedObject] struct {
getId func(O) string
getWatchId func(*jsonutils.JSONDict) string
getDBObject FGetDBObject
onAdd func(obj db.IModel)
onUpdate func(oldObj *jsonutils.JSONDict, newObj db.IModel)
onDelete func(obj *jsonutils.JSONDict)
}

func NewResourceStore[O lockman.ILockedObject](
modelMan db.IModelManager,
res informer.IResourceManager,
) IResourceStore[O] {
) *ResourceStore[O] {
return newResourceStore[O](modelMan, res, nil, nil, nil)
}

Expand All @@ -129,7 +132,7 @@ func NewJointResourceStore[O lockman.ILockedObject](
getId func(O) string,
getWatchId func(*jsonutils.JSONDict) string,
getDBObject FGetDBObject,
) IResourceStore[O] {
) *ResourceStore[O] {
return newResourceStore(modelMan, res, getId, getWatchId, getDBObject)
}

Expand All @@ -139,7 +142,7 @@ func newResourceStore[O lockman.ILockedObject](
getId func(O) string,
getWatchId func(*jsonutils.JSONDict) string,
getDBObject FGetDBObject,
) IResourceStore[O] {
) *ResourceStore[O] {
if getId == nil {
getId = func(o O) string {
return o.GetId()
Expand All @@ -163,9 +166,27 @@ func newResourceStore[O lockman.ILockedObject](
getId: getId,
getWatchId: getWatchId,
getDBObject: getDBObject,
onAdd: nil,
onUpdate: nil,
onDelete: nil,
}
}

func (s *ResourceStore[O]) WithOnAdd(onAdd func(db.IModel)) *ResourceStore[O] {
s.onAdd = onAdd
return s
}

func (s *ResourceStore[O]) WithOnUpdate(onUpdate func(old *jsonutils.JSONDict, newObj db.IModel)) *ResourceStore[O] {
s.onUpdate = onUpdate
return s
}

func (s *ResourceStore[O]) WithOnDelete(onDelete func(*jsonutils.JSONDict)) *ResourceStore[O] {
s.onDelete = onDelete
return s
}

func (s *ResourceStore[O]) GetInformerResourceManager() informer.IResourceManager {
return s.res
}
Expand Down Expand Up @@ -220,6 +241,9 @@ func (s *ResourceStore[O]) Add(obj *jsonutils.JSONDict) {
tmpObj := v.Elem().Interface()
s.dataMap.Store(id, tmpObj)
log.Infof("Add %s %s", s.modelMan.Keyword(), obj.String())
if s.onAdd != nil {
s.onAdd(dbObj)
}
} else {
log.Errorf("Fetch %s by id %s error when created: %v", s.modelMan.Keyword(), id, err)
}
Expand Down Expand Up @@ -250,6 +274,9 @@ func (s *ResourceStore[O]) Update(oldObj, newObj *jsonutils.JSONDict) {
tmpObj := v.Elem().Interface()
s.dataMap.Store(id, tmpObj)
log.Infof("Update %s %s", s.modelMan.Keyword(), newObj.String())
if s.onUpdate != nil {
s.onUpdate(oldObj, dbObj)
}
} else {
log.Errorf("Fetch %s by id %s error when updated: %v", s.modelMan.Keyword(), id, err)
}
Expand All @@ -261,6 +288,9 @@ func (s *ResourceStore[O]) Delete(obj *jsonutils.JSONDict) {
if id != "" {
s.dataMap.Delete(id)
log.Infof("Delete %s %s", s.modelMan.Keyword(), obj.String())
if s.onDelete != nil {
s.onDelete(obj)
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
candidatecache "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
"yunion.io/x/onecloud/pkg/scheduler/core"
"yunion.io/x/onecloud/pkg/scheduler/data_manager"
"yunion.io/x/onecloud/pkg/scheduler/data_manager/common"
schedmodels "yunion.io/x/onecloud/pkg/scheduler/models"
o "yunion.io/x/onecloud/pkg/scheduler/options"
"yunion.io/x/onecloud/pkg/util/k8s"
Expand All @@ -48,7 +49,7 @@ type SchedulerManager struct {
KubeClusterManager *k8s.SKubeClusterManager
}

func NewSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
func newSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
sm := &SchedulerManager{}
sm.DataManager = data_manager.NewDataManager(stopCh)
sm.CandidateManager = data_manager.NewCandidateManager(sm.DataManager, stopCh)
Expand All @@ -58,6 +59,8 @@ func NewSchedulerManager(stopCh <-chan struct{}) *SchedulerManager {
sm.TaskManager = NewTaskManager(stopCh)
sm.KubeClusterManager = k8s.NewKubeClusterManager(o.Options.Region, 30*time.Second)

common.RegisterCacheManager(sm.CandidateManager)

return sm
}

Expand All @@ -74,7 +77,7 @@ func InitAndStart(stopCh <-chan struct{}) {
log.Warningf("Global scheduler already init.")
return
}
schedManager = NewSchedulerManager(stopCh)
schedManager = newSchedulerManager(stopCh)
go schedManager.start()
log.Infof("InitAndStart ok")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func StartService() error {
for _, f := range []func(ctx context.Context){
cloudregion.Manager.Start,
zone.Manager.Start,
cloudprovider.Manager.Start,
cloudprovider.GetManager().Start,
cloudaccount.Manager.Start,
wire.Manager.Start,
network.Manager.Start,
Expand Down

0 comments on commit 2e7e80b

Please sign in to comment.