Skip to content

Commit

Permalink
Support collect logs for failed agents and controller for supportbundle
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Jan 9, 2025
1 parent 357d38f commit 1782904
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 31 deletions.
196 changes: 180 additions & 16 deletions pkg/antctl/raw/supportbundle/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilerror "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -47,6 +51,8 @@ import (
systemv1beta1 "antrea.io/antrea/pkg/apis/system/v1beta1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1"
"antrea.io/antrea/pkg/util/compress"
"antrea.io/antrea/pkg/util/k8s"
)

const (
Expand Down Expand Up @@ -581,6 +587,20 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create clientset: %w", err)
}

if err := os.MkdirAll(option.dir, 0700); err != nil {
return fmt.Errorf("error when creating output dir: %w", err)
}

f, err := os.Create(filepath.Join(option.dir, "clusterinfo"))
if err != nil {
return err
}
defer f.Close()
err = getClusterInfo(f, k8sClientset)
if err != nil {
return err
}

var controllerClient systemclientset.SupportBundleInterface
var agentClients map[string]systemclientset.SupportBundleInterface

Expand Down Expand Up @@ -625,29 +645,17 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no matched Nodes found to collect agent bundles")
}

if err := os.MkdirAll(option.dir, 0700|os.ModeDir); err != nil {
return fmt.Errorf("error when creating output dir: %w", err)
}
amount := len(agentClients) * 2
if controllerClient != nil {
amount += 2
}
bar := barTmpl.Start(amount)
defer bar.Finish()
defer bar.Set("prefix", "Finish ")
f, err := os.Create(filepath.Join(option.dir, "clusterinfo"))
if err != nil {
return err
}
defer f.Close()
err = getClusterInfo(f, k8sClientset)
if err != nil {
return err
}

results := requestAll(ctx, agentClients, controllerClient, bar)
results = downloadAll(ctx, agentClients, controllerClient, dir, bar, results)
return processResults(results, dir)
return processResults(ctx, antreaClientset, k8sClientset, results, dir)
}

func genErrorMsg(resultMap map[string]error) string {
Expand All @@ -659,8 +667,9 @@ func genErrorMsg(resultMap map[string]error) string {
}

// processResults will output the failed nodes and their reasons if any. If no data was collected,
// error is returned, otherwise will return nil.
func processResults(resultMap map[string]error, dir string) error {
// error is returned, otherwise will return nil. For failed nodes and controller, will also trying to get logs from
// kubernetes api.
func processResults(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, resultMap map[string]error, dir string) error {
resultStr := ""
var failedNodes []string
allFailed := true
Expand All @@ -676,7 +685,8 @@ func processResults(resultMap map[string]error, dir string) error {
}
}

if resultMap[""] != nil {
controllerFailed := resultMap[""] != nil
if controllerFailed {
fmt.Println("Controller Info Failed Reason: " + resultMap[""].Error())
}

Expand All @@ -689,9 +699,163 @@ func processResults(resultMap map[string]error, dir string) error {
err = writeFailedNodes(dir, failedNodes)
}

// download logs from kubernetes api
if failedNodes != nil {
if err = downloadFallbackAgentBundleFromKubernetes(ctx, antreaClientset, k8sClient, failedNodes, dir); err != nil {
fmt.Println("Failed to download agent bundle from kubernetes api: " + err.Error())
} else {
allFailed = false
}
}
if controllerFailed {
if err = downloadFallbackControllerBundleFromKubernetes(ctx, antreaClientset, k8sClient, dir); err != nil {
fmt.Println("Failed to download controller bundle from kubernetes api: " + err.Error())
} else {
allFailed = false
}
}

if allFailed {
return fmt.Errorf("no data was collected: %s", genErrorMsg(resultMap))
} else {
return err
}
}

func downloadFallbackControllerBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, dir string) error {
tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_")
if err != nil {
return err
}
defer defaultFS.RemoveAll(tmpDir)

var podRef *corev1.ObjectReference
if err := func() error {
controllerInfo, err := antreaClientset.CrdV1beta1().AntreaControllerInfos().Get(ctx, v1beta1.AntreaControllerInfoResourceName, metav1.GetOptions{})
if err != nil {
return err
}
podRef = &controllerInfo.PodRef
data, err := yaml.Marshal(controllerInfo)
if err != nil {
return err
}
if err := afero.WriteFile(defaultFS, filepath.Join(dir, "controllerinfo"), data, 0644); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
if podRef == nil {
return fmt.Errorf("no podRef found in AntreaControllerInfo")
}
pod, err := k8sClient.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{})
if err != nil {
return err
}
if err := downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(pod), tmpDir); err != nil {
return err
}
return packPodBundle(pod, dir, tmpDir)
}

func downloadFallbackAgentBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, failedNodes []string, dir string) error {
agentInfoList, err := antreaClientset.CrdV1beta1().AntreaAgentInfos().List(ctx, metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
return err
}

agentInfoMap := map[string]v1beta1.AntreaAgentInfo{}
for _, agentInfo := range agentInfoList.Items {
agentInfoMap[agentInfo.Name] = agentInfo
}
pods, err := k8sClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
ResourceVersion: "0",
LabelSelector: "app=antrea,component=antrea-agent",
})
if err != nil {
return err
}
failedNodeSet := sets.NewString(failedNodes...)
var errors []error
for _, pod := range pods.Items {
if !failedNodeSet.Has(pod.Spec.NodeName) {
continue
}
if err := func() error {
tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_")
if err != nil {
return err
}
defer defaultFS.RemoveAll(tmpDir)
if agentInfo, ok := agentInfoMap[pod.Spec.NodeName]; ok {
data, err := yaml.Marshal(agentInfo)
if err != nil {
return err
}
if err = afero.WriteFile(defaultFS, filepath.Join(tmpDir, "agentinfo"), data, 0644); err != nil {
return err
}
}
err = downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(&pod), tmpDir)
if err != nil {
return err
}
return packPodBundle(&pod, dir, tmpDir)
}(); err != nil {
errors = append(errors, err)
}
}
return utilerror.NewAggregate(errors)
}

func packPodBundle(pod *corev1.Pod, dir string, bundleDir string) error {
prefix := "agent_"
if strings.Contains(pod.Name, "controller") {
prefix = "controller_"
}
gzFileName := filepath.Join(dir, prefix+pod.Spec.NodeName+".tar.gz")
f, err := defaultFS.Create(gzFileName)
if err != nil {
return err
}
defer f.Close()
_, err = compress.PackDir(defaultFS, bundleDir, f)
return err
}

func downloadPodLogs(ctx context.Context, k8sClient kubernetes.Interface, namespace string, podName string, containers []string, dir string) error {
downloadContainerLogs := func(containerName string) error {
containerDirName, _ := strings.CutPrefix(containerName, "antrea-")
containerLogDir := filepath.Join(dir, "logs", containerDirName)
err := os.MkdirAll(containerLogDir, 0755)
if err != nil {
return err
}
fileName := filepath.Join(containerLogDir, containerName+".log")
f, err := defaultFS.Create(fileName)
if err != nil {
return err
}
defer f.Close()
logOption := &corev1.PodLogOptions{
Container: containerName,
}
logs := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, logOption)
logStream, err := logs.Stream(ctx)
if err != nil {
return err
}

if _, err = io.Copy(f, logStream); err != nil {
return err
}
return logStream.Close()
}
var errors []error
for _, containerName := range containers {
errors = append(errors, downloadContainerLogs(containerName))
}
return utilerror.NewAggregate(errors)
}
Loading

0 comments on commit 1782904

Please sign in to comment.