From c72e28bccf09faa595b6ff6c1102de6e345a4357 Mon Sep 17 00:00:00 2001 From: zhangzihengya Date: Fri, 15 Mar 2024 21:23:49 +0800 Subject: [PATCH] add proc visualization --- .../collector/collect_output.go | 242 +++++++++++++----- .../eBPF_prometheus/dao/data_to_sqlite.go | 53 ---- .../eBPF_prometheus/prom_core/processer.go | 37 --- 3 files changed, 175 insertions(+), 157 deletions(-) diff --git a/eBPF_Visualization/eBPF_prometheus/collector/collect_output.go b/eBPF_Visualization/eBPF_prometheus/collector/collect_output.go index 16863596e..947972988 100644 --- a/eBPF_Visualization/eBPF_prometheus/collector/collect_output.go +++ b/eBPF_Visualization/eBPF_prometheus/collector/collect_output.go @@ -261,11 +261,8 @@ func (b *BPF_name) Run(full string) error { // 创建一个用于传递 map 数据的通道 mapchan := make(chan []map[string]interface{}, 2) - // 创建一个用于传递进程画像 map 数据的通道 - procmap_chan := make(chan map[int][]map[string]float64, 2) - // 启动一个 goroutine 用于重定向命令的标准输出 - go redirectStdout(fn, stdout, mapchan, procmap_chan) + go redirectStdout(fn, stdout, mapchan) // 创建一个指向 prom_core.MyMetrics 类型的指针 metricsobj, // 并使用结构体字段初始化 BPFName 和 Sqlinited。 @@ -298,11 +295,6 @@ func (b *BPF_name) Run(full string) error { } // 从通道中接收第二次数据 <-mapchan - case <-procmap_chan: - //receivedElement := <-procmap_chan - //fmt.Println("Received Element:", receivedElement) - metricsobj.procMaplist = <-procmap_chan - log.Println(metricsobj.procMaplist) default: } } @@ -348,7 +340,7 @@ func listenSystemSignals(cmd *exec.Cmd) { } // 定义了一个名为 redirectStdout 的函数,用于读取 stdout,并将其解析成 map 数据发送到指定通道 -func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]interface{}, procmap_chan chan map[int][]map[string]float64) { +func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]interface{}) { // 用于存储解析后的 map 数据的切片 var maps []map[string]interface{} // 互斥锁,用于保护 maps 切片的并发写入 @@ -362,6 +354,7 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i var syscall_titles []string var ulock_titles []string var kt_titles []string + // 行号计数器 var line_number = 1 // 命令索引,用于区分不同的命令 @@ -370,9 +363,19 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i // RESOURCE(1)、SCHEDULE(2)、SYSCALL(3)、USERLOCK(4)、KEYTIME(5) var data_type = 0 var set_map = 0 + var enable_tgid = 0 // 1 代表已创建表头 data_map := [6]int{0, 0, 0, 0, 0, 0} + rsc_pidheader := make(map[int][]string) + sched_pidheader := make(map[int][]string) + syscall_pidheader := make(map[int][]string) + ulock_pidheader := make(map[int][]string) + kt_pidheader := make(map[int][]string) + + var pid int + var err error + // proc_image // 判断程序名是否为 proc_image if fn == "proc_image" { @@ -395,84 +398,189 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i data_type = 5 } set_map = 1 + enable_tgid = 0 } else if set_map == 1 { - // 对于表头行判断是否已经记录过表头,若没记录过则记录到相应的map中,设置 set_map 为 0 + // 对于表头行判断是否已经记录过表头,若没记录过则记录到相应的map中 if data_map[data_type] == 0 { - for _, value := range fields[2:] { - switch data_type { - case 1: - rsc_titles = append(rsc_titles, value) - case 2: - sched_titles = append(sched_titles, value) - case 3: - syscall_titles = append(syscall_titles, value) - case 4: - ulock_titles = append(ulock_titles, value) - case 5: - kt_titles = append(kt_titles, value) + if fields[1] == "PID" { + for _, value := range fields[2:] { + switch data_type { + case 1: + rsc_titles = append(rsc_titles, value) + case 2: + sched_titles = append(sched_titles, value) + case 3: + syscall_titles = append(syscall_titles, value) + case 4: + ulock_titles = append(ulock_titles, value) + case 5: + kt_titles = append(kt_titles, value) + } } + } else { + for _, value := range fields[3:] { + switch data_type { + case 1: + rsc_titles = append(rsc_titles, value) + case 2: + sched_titles = append(sched_titles, value) + case 3: + syscall_titles = append(syscall_titles, value) + case 4: + ulock_titles = append(ulock_titles, value) + case 5: + kt_titles = append(kt_titles, value) + } + } + enable_tgid = 1 } data_map[data_type] = 1 } set_map = 0 } else { - // 则根据 data_type 进行数据的记录(利用case语句) + if enable_tgid == 0 { + // pid 为 fields[1] + pid, err = strconv.Atoi(fields[1]) + if err != nil { + // 处理转换错误 + fmt.Println("Error:", err) + return + } + } else if enable_tgid == 1 { + // pid 为 fields[2] + pid, err = strconv.Atoi(fields[2]) + if err != nil { + // 处理转换错误 + fmt.Println("Error:", err) + return + } + } + switch data_type { case 1: - rsc_Map := make(map[int][]map[string]float64) - secondElement, _ := strconv.Atoi(fields[1]) - - rsc_Map[secondElement] = make([]map[string]float64, 0) - for i, value := range fields[2:] { - floatValue, _ := strconv.ParseFloat(value, 64) - rsc_Map[secondElement] = append(rsc_Map[secondElement], map[string]float64{rsc_titles[i]: floatValue}) + if _, ok := rsc_pidheader[pid]; !ok { + if enable_tgid == 0 { + for _, title := range rsc_titles { + if enable_tgid == 0 { + rsc_pidheader[pid] = append(rsc_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title)) + } else if enable_tgid == 1 { + rsc_pidheader[pid] = append(rsc_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title)) + } + } + } } - - procmap_chan <- rsc_Map case 2: - sched_Map := make(map[int][]map[string]float64) - secondElement, _ := strconv.Atoi(fields[1]) - - sched_Map[secondElement] = make([]map[string]float64, 0) - for i, value := range fields[2:] { - floatValue, _ := strconv.ParseFloat(value, 64) - sched_Map[secondElement] = append(sched_Map[secondElement], map[string]float64{sched_titles[i]: floatValue}) + if _, ok := sched_pidheader[pid]; !ok { + for _, title := range sched_titles { + if enable_tgid == 0 { + sched_pidheader[pid] = append(sched_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title)) + } else if enable_tgid == 1 { + sched_pidheader[pid] = append(sched_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title)) + } + } } - - procmap_chan <- sched_Map case 3: - syscall_Map := make(map[int][]map[string]float64) - secondElement, _ := strconv.Atoi(fields[1]) - - syscall_Map[secondElement] = make([]map[string]float64, 0) - for i, value := range fields[2:] { - floatValue, _ := strconv.ParseFloat(value, 64) - syscall_Map[secondElement] = append(syscall_Map[secondElement], map[string]float64{syscall_titles[i]: floatValue}) + if _, ok := syscall_pidheader[pid]; !ok { + for _, title := range syscall_titles { + if enable_tgid == 0 { + syscall_pidheader[pid] = append(syscall_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title)) + } else if enable_tgid == 1 { + syscall_pidheader[pid] = append(syscall_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title)) + } + } } - - procmap_chan <- syscall_Map case 4: - ulock_Map := make(map[int][]map[string]float64) - secondElement, _ := strconv.Atoi(fields[1]) - - ulock_Map[secondElement] = make([]map[string]float64, 0) - for i, value := range fields[2:] { - floatValue, _ := strconv.ParseFloat(value, 64) - ulock_Map[secondElement] = append(ulock_Map[secondElement], map[string]float64{ulock_titles[i]: floatValue}) + if _, ok := ulock_pidheader[pid]; !ok { + for _, title := range ulock_titles { + if enable_tgid == 0 { + ulock_pidheader[pid] = append(ulock_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title)) + } else if enable_tgid == 1 { + ulock_pidheader[pid] = append(ulock_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title)) + } + } } - - procmap_chan <- ulock_Map case 5: - kt_Map := make(map[int][]map[string]float64) - secondElement, _ := strconv.Atoi(fields[1]) + if _, ok := kt_pidheader[pid]; !ok { + for _, title := range kt_titles { + if enable_tgid == 0 { + kt_pidheader[pid] = append(kt_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title)) + } else if enable_tgid == 1 { + kt_pidheader[pid] = append(kt_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title)) + } + } + } + } - kt_Map[secondElement] = make([]map[string]float64, 0) + switch data_type { + case 1: + rsc_Map := make(map[string]interface{}) + mu.Lock() + rsc_header := rsc_pidheader[pid] + if enable_tgid == 0 { + for i, value := range fields[2:] { + rsc_Map[rsc_header[i]] = value + } + } else if enable_tgid == 1 { + for i, value := range fields[3:] { + rsc_Map[rsc_header[i]] = value + } + } + mu.Unlock() + mapchan <- []map[string]interface{}{rsc_Map} + case 2: + sched_Map := make(map[string]interface{}) + mu.Lock() + sched_header := sched_pidheader[pid] for i, value := range fields[2:] { - floatValue, _ := strconv.ParseFloat(value, 64) - kt_Map[secondElement] = append(kt_Map[secondElement], map[string]float64{kt_titles[i]: floatValue}) + sched_Map[sched_header[i]] = value } - - procmap_chan <- kt_Map + mu.Unlock() + mapchan <- []map[string]interface{}{sched_Map} + case 3: + syscall_Map := make(map[string]interface{}) + mu.Lock() + syscall_header := syscall_pidheader[pid] + if enable_tgid == 0 { + for i, value := range fields[2:] { + syscall_Map[syscall_header[i-2]] = value + } + } else if enable_tgid == 1 { + for i, value := range fields[3:] { + syscall_Map[syscall_header[i-3]] = value + } + } + mu.Unlock() + mapchan <- []map[string]interface{}{syscall_Map} + case 4: + ulock_Map := make(map[string]interface{}) + mu.Lock() + ulock_header := ulock_pidheader[pid] + if enable_tgid == 0 { + for i, value := range fields[2:] { + ulock_Map[ulock_header[i-2]] = value + } + } else if enable_tgid == 1 { + for i, value := range fields[3:] { + ulock_Map[ulock_header[i-3]] = value + } + } + mu.Unlock() + mapchan <- []map[string]interface{}{ulock_Map} + case 5: + kt_Map := make(map[string]interface{}) + mu.Lock() + kt_header := kt_pidheader[pid] + if enable_tgid == 0 { + for i, value := range fields[2:] { + kt_Map[kt_header[i-2]] = value + } + } else if enable_tgid == 1 { + for i, value := range fields[3:] { + kt_Map[kt_header[i-3]] = value + } + } + mu.Unlock() + mapchan <- []map[string]interface{}{kt_Map} } } // 行号递增 diff --git a/eBPF_Visualization/eBPF_prometheus/dao/data_to_sqlite.go b/eBPF_Visualization/eBPF_prometheus/dao/data_to_sqlite.go index 10a82b3b9..74c07b156 100644 --- a/eBPF_Visualization/eBPF_prometheus/dao/data_to_sqlite.go +++ b/eBPF_Visualization/eBPF_prometheus/dao/data_to_sqlite.go @@ -38,12 +38,6 @@ type Sqlobj struct { Data map[string]interface{} } -type procSqlobj struct { - Tablename string - db *gorm.DB - Data map[int]map[string]float64 -} - type Basicdata struct { ID int `gorm:"primaryKey;unique;column:ID"` } @@ -57,22 +51,10 @@ func (s *Sqlobj) Connectsql() { s.db = db } -func (s *procSqlobj) Connectsql() { - currentdir, _ := os.Getwd() - path := currentdir + "/dao/data.db" - db, _ := gorm.Open(sqlite.Open(path), &gorm.Config{}) - log.Println("connected.") - s.db = db -} - func (s *Sqlobj) Tableexist(name string) bool { return s.db.Migrator().HasTable(name) } -func (s *procSqlobj) Tableexist(name string) bool { - return s.db.Migrator().HasTable(name) -} - // CreateTable 建表 func (s *Sqlobj) OperateTable(name string) { if !s.Tableexist(name) { @@ -90,22 +72,6 @@ func (s *Sqlobj) OperateTable(name string) { } } -func (s *procSqlobj) OperateTable(name string) { - if !s.Tableexist(name) { - deletetable := fmt.Sprintf("drop table if exists %s;", s.Tablename) - if err := s.db.Exec(deletetable).Error; err != nil { - log.Fatalf("drop exist table failed.") - } - if err := s.db.Table(s.Tablename).AutoMigrate(&Basicdata{}); err != nil { - log.Fatalf("create table failed.") - } - s.AppendTable() - s.CreateRow() - } else { - s.CreateRow() - } -} - // AppendTable 扩展表 func (s *Sqlobj) AppendTable() { for key, value := range s.Data { @@ -121,26 +87,7 @@ func (s *Sqlobj) AppendTable() { } } -func (s *procSqlobj) AppendTable() { - for key, value := range s.Data { - datatype := "text" - for innerkey, _ := range value { - // if strvalue, is_string := innervalue.(string); is_string { - // if _, err := strconv.ParseFloat(strvalue, 64); err == nil { - // datatype = "real" - // } - // } - addcolumn := fmt.Sprintf("alter table %s add column \"%s\"%s\" %s", s.Tablename, key, innerkey, datatype) - s.db.Exec(addcolumn) - } - } -} - // CreateRow 写入数据 func (s *Sqlobj) CreateRow() { s.db.Table(s.Tablename).Create(s.Data) } - -func (s *procSqlobj) CreateRow() { - s.db.Table(s.Tablename).Create(s.Data) -} diff --git a/eBPF_Visualization/eBPF_prometheus/prom_core/processer.go b/eBPF_Visualization/eBPF_prometheus/prom_core/processer.go index 66d12895a..0d4ae64a1 100644 --- a/eBPF_Visualization/eBPF_prometheus/prom_core/processer.go +++ b/eBPF_Visualization/eBPF_prometheus/prom_core/processer.go @@ -47,19 +47,8 @@ type MyMetrics struct { Sqlinited bool } -type procMetrics struct { - BPFName string - mu sync.Mutex - procMaps map[int]map[string]float64 - procMaplist map[int][]map[string]float64 - procSqlobj *dao.procSqlobj - Sqlinited bool -} - func (m *MyMetrics) Describe(ch chan<- *prometheus.Desc) {} -func (m *procMetrics) Describe(ch chan<- *prometheus.Desc) {} - // Convert_Maps_To_Dict shift dict list to dict func (m *MyMetrics) UpdateData() { new_Dict := make(map[string]interface{}) @@ -71,30 +60,11 @@ func (m *MyMetrics) UpdateData() { m.Maps = new_Dict } -func (m *procMetrics) UpdateData() { - new_Dict := make(map[int]map[string]float64) - for key, value := range m.procMaplist { - innerMap := make(map[string]float64) - for _, item := range value { - for innerKey, innerValue := range item { - innerMap[innerKey] = innerValue - } - } - new_Dict[key] = innerMap - } - m.procMaps = new_Dict -} - func (m *MyMetrics) UpdataSql() { m.Sqlobj.Data = m.Maps m.Sqlobj.CreateRow() } -func (m *procMetrics) UpdataSql() { - m.procSqlobj.Data = m.procMaps - m.procSqlobj.CreateRow() -} - func (m *MyMetrics) Initsql() { m.Sqlobj.Data = m.Maps m.Sqlobj.Connectsql() @@ -102,13 +72,6 @@ func (m *MyMetrics) Initsql() { m.Sqlinited = true } -func (m *procMetrics) Initsql() { - m.procSqlobj.Data = m.procMaps - m.procSqlobj.Connectsql() - m.procSqlobj.OperateTable(m.BPFName) - m.Sqlinited = true -} - // Format_Dict format dict. func Format_Dict(dict map[string]interface{}) (map[string]float64, map[string]string) { measurable_dict := map[string]float64{}