From 9875d24ba0f6936700ce54fad35df6668f717c2f Mon Sep 17 00:00:00 2001 From: Qiu Jian Date: Mon, 11 Dec 2023 00:28:44 +0800 Subject: [PATCH] fix: log metric collection action --- pkg/cloudmon/misc/pinger.go | 2 +- pkg/cloudmon/resources/resources.go | 184 +++++++++++++++++++++------- pkg/util/logclient/consts.go | 2 + pkg/util/logclient/consts_i18n.go | 5 + 4 files changed, 149 insertions(+), 44 deletions(-) diff --git a/pkg/cloudmon/misc/pinger.go b/pkg/cloudmon/misc/pinger.go index 5ee30f3e1c5..353e5068aa4 100644 --- a/pkg/cloudmon/misc/pinger.go +++ b/pkg/cloudmon/misc/pinger.go @@ -74,7 +74,7 @@ func PingProbe(ctx context.Context, userCred mcclient.TokenCredential, isStart b network := sNetwork{networks[i]} m, err := pingProbeNetwork(s, network) if err != nil { - log.Errorf("pingProbeNetwork") + log.Errorf("pingProbeNetwork network %s(%s-%s) fail %s", network.Name, network.GuestIpStart, network.GuestIpEnd, err) continue } metrics = append(metrics, m...) diff --git a/pkg/cloudmon/resources/resources.go b/pkg/cloudmon/resources/resources.go index 2c8f32f666b..14149c8c9ac 100644 --- a/pkg/cloudmon/resources/resources.go +++ b/pkg/cloudmon/resources/resources.go @@ -24,6 +24,7 @@ import ( "yunion.io/x/cloudmux/pkg/cloudprovider" "yunion.io/x/jsonutils" "yunion.io/x/log" + "yunion.io/x/pkg/appctx" "yunion.io/x/pkg/errors" api "yunion.io/x/onecloud/pkg/apis/compute" @@ -36,6 +37,7 @@ import ( "yunion.io/x/onecloud/pkg/mcclient/modules/compute" "yunion.io/x/onecloud/pkg/mcclient/modules/identity" "yunion.io/x/onecloud/pkg/util/influxdb" + "yunion.io/x/onecloud/pkg/util/logclient" ) type sBaseInfo struct { @@ -632,14 +634,30 @@ func (self *SResources) UpdateSync(ctx context.Context, userCred mcclient.TokenC } } -func (self *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) { +type sMetricProvider struct { + api.CloudproviderDetails +} + +func (p sMetricProvider) GetId() string { + return p.Id +} + +func (p sMetricProvider) GetName() string { + return p.Name +} + +func (p sMetricProvider) Keyword() string { + return "cloudprovider" +} + +func (res *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) { if isStart { return } ch := make(chan struct{}, options.Options.CloudAccountCollectMetricsBatchCount) defer close(ch) - s := auth.GetAdminSession(context.Background(), options.Options.Region) - resources := self.Cloudproviders.getResources(ctx, "") + s := auth.GetAdminSession(ctx, options.Options.Region) + resources := res.Cloudproviders.getResources(ctx, "") cloudproviders := map[string]api.CloudproviderDetails{} jsonutils.Update(&cloudproviders, resources) az, _ := time.LoadLocation(options.Options.TimeZone) @@ -649,194 +667,274 @@ func (self *SResources) CollectMetrics(ctx context.Context, userCred mcclient.To for i := range cloudproviders { ch <- struct{}{} wg.Add(1) - go func(manager api.CloudproviderDetails) { + goctx := context.WithValue(ctx, appctx.APP_CONTEXT_KEY_START_TIME, time.Now().UTC()) + go func(ctx context.Context, manager api.CloudproviderDetails) { + succ := true + msgs := make([]string, 0) defer func() { + logclient.AddActionLogWithContext(ctx, &sMetricProvider{manager}, logclient.ACT_COLLECT_METRICS, strings.Join(msgs, ";"), userCred, succ) wg.Done() <-ch }() if strings.Contains(strings.ToLower(options.Options.SkipMetricPullProviders), strings.ToLower(manager.Provider)) { - log.Infof("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders) + logmsg := fmt.Sprintf("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders) + log.Infoln(logmsg) + msgs = append(msgs, logmsg) return } driver, err := providerdriver.GetDriver(manager.Provider) if err != nil { - log.Errorf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false return } if !driver.IsSupportMetrics() { - log.Infof("%s not support metrics, skip", driver.GetProvider()) + logmsg := fmt.Sprintf("%s not support metrics, skip", driver.GetProvider()) + log.Infoln(logmsg) + msgs = append(msgs, logmsg) return } provider, err := compute.Cloudproviders.GetProvider(ctx, s, manager.Id) if err != nil { - log.Errorf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false return } duration := driver.GetDelayDuration() endTime := _endTime.Add(-1 * duration) startTime := _startTime.Add(-1 * duration).Add(time.Second * -59) - resources = self.DBInstances.getResources(ctx, manager.Id) + resources = res.DBInstances.getResources(ctx, manager.Id) dbinstances := map[string]api.DBInstanceDetails{} err = jsonutils.Update(&dbinstances, resources) if err != nil { - log.Errorf("unmarsha rds resources error: %v", err) + logmsg := fmt.Sprintf("unmarshal rds resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(dbinstances) > 0 { err = driver.CollectDBInstanceMetrics(ctx, manager, provider, dbinstances, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Servers.getResources(ctx, manager.Id) + resources = res.Servers.getResources(ctx, manager.Id) servers := map[string]api.ServerDetails{} err = jsonutils.Update(&servers, resources) if err != nil { - log.Errorf("unmarsha server resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha server resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(servers) > 0 { err = driver.CollectServerMetrics(ctx, manager, provider, servers, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorf(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Hosts.getResources(ctx, manager.Id) + resources = res.Hosts.getResources(ctx, manager.Id) hosts := map[string]api.HostDetails{} err = jsonutils.Update(&hosts, resources) if err != nil { - log.Errorf("unmarsha host resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha host resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(hosts) > 0 { err = driver.CollectHostMetrics(ctx, manager, provider, hosts, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Storages.getResources(ctx, manager.Id) + resources = res.Storages.getResources(ctx, manager.Id) storages := map[string]api.StorageDetails{} err = jsonutils.Update(&storages, resources) if err != nil { - log.Errorf("unmarsha storage resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha storage resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(storages) > 0 { err = driver.CollectStorageMetrics(ctx, manager, provider, storages, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Redis.getResources(ctx, manager.Id) + resources = res.Redis.getResources(ctx, manager.Id) caches := map[string]api.ElasticcacheDetails{} err = jsonutils.Update(&caches, resources) if err != nil { - log.Errorf("unmarsha redis resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha redis resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(caches) > 0 { err = driver.CollectRedisMetrics(ctx, manager, provider, caches, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorf(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Loadbalancers.getResources(ctx, manager.Id) + resources = res.Loadbalancers.getResources(ctx, manager.Id) lbs := map[string]api.LoadbalancerDetails{} err = jsonutils.Update(&lbs, resources) if err != nil { - log.Errorf("unmarsha lb resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha lb resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(lbs) > 0 { err = driver.CollectLoadbalancerMetrics(ctx, manager, provider, lbs, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorf(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Buckets.getResources(ctx, manager.Id) + resources = res.Buckets.getResources(ctx, manager.Id) buckets := map[string]api.BucketDetails{} err = jsonutils.Update(&buckets, resources) if err != nil { - log.Errorf("unmarsha bucket resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha bucket resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(buckets) > 0 { err = driver.CollectBucketMetrics(ctx, manager, provider, buckets, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.KubeClusters.getResources(ctx, manager.Id) + resources = res.KubeClusters.getResources(ctx, manager.Id) clusters := map[string]api.KubeClusterDetails{} err = jsonutils.Update(&clusters, resources) if err != nil { - log.Errorf("unmarsha k8s resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha k8s resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(clusters) > 0 { err = driver.CollectK8sMetrics(ctx, manager, provider, clusters, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.ModelartsPool.getResources(ctx, manager.Id) + resources = res.ModelartsPool.getResources(ctx, manager.Id) pools := map[string]api.ModelartsPoolDetails{} err = jsonutils.Update(&pools, resources) if err != nil { - log.Errorf("unmarsha modelarts resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha modelarts resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(pools) > 0 { err = driver.CollectModelartsPoolMetrics(ctx, manager, provider, pools, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.Wires.getResources(ctx, manager.Id) + resources = res.Wires.getResources(ctx, manager.Id) wires := map[string]api.WireDetails{} err = jsonutils.Update(&wires, resources) if err != nil { - log.Errorf("unmarsha wires resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha wires resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(wires) > 0 { err = driver.CollectWireMetrics(ctx, manager, provider, wires, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - resources = self.ElasticIps.getResources(ctx, manager.Id) + resources = res.ElasticIps.getResources(ctx, manager.Id) eips := map[string]api.ElasticipDetails{} err = jsonutils.Update(&eips, resources) if err != nil { - log.Errorf("unmarsha eips resources error: %v", err) + logmsg := fmt.Sprintf("unmarsha eips resources error: %v", err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } if len(eips) > 0 { err = driver.CollectEipMetrics(ctx, manager, provider, eips, startTime, endTime) if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported { - log.Errorf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + logmsg := fmt.Sprintf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err) + log.Errorln(logmsg) + msgs = append(msgs, logmsg) + succ = false } } - }(cloudproviders[i]) + }(goctx, cloudproviders[i]) } wg.Wait() - resources = self.Cloudaccounts.getResources(ctx, "") + resources = res.Cloudaccounts.getResources(ctx, "") accounts := map[string]api.CloudaccountDetail{} jsonutils.Update(&accounts, resources) diff --git a/pkg/util/logclient/consts.go b/pkg/util/logclient/consts.go index 41e10e0d48e..4c02617e173 100644 --- a/pkg/util/logclient/consts.go +++ b/pkg/util/logclient/consts.go @@ -276,4 +276,6 @@ const ( ACT_DELETE_SECURITY_GROUP_RULE = "delete_security_group_rule" ACT_CLEAN_PROJECT = "clean_project" + + ACT_COLLECT_METRICS = "collect_metrics" ) diff --git a/pkg/util/logclient/consts_i18n.go b/pkg/util/logclient/consts_i18n.go index 9b883137f99..b686ae22a34 100644 --- a/pkg/util/logclient/consts_i18n.go +++ b/pkg/util/logclient/consts_i18n.go @@ -1450,4 +1450,9 @@ func init() { EN("Sync Cloud Resource"). CN("同步云资源"), ) + + o.Set(ACT_COLLECT_METRICS, i18n.NewTableEntry(). + EN("Collect monitoring metrics"). + CN("采集监控指标"), + ) }