Skip to content

Commit

Permalink
Merge pull request #716 from zhangzihengya/develop
Browse files Browse the repository at this point in the history
proc_image:使eBPF_Visualization支持进程画像的可视化
  • Loading branch information
chenamy2017 authored Mar 18, 2024
2 parents e3540e4 + 79fd6b2 commit 4f51d41
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 41 deletions.
189 changes: 160 additions & 29 deletions eBPF_Visualization/eBPF_prometheus/collector/collect_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os/exec"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -260,14 +261,13 @@ func (b *BPF_name) Run(full string) error {
// 创建一个用于传递 map 数据的通道
mapchan := make(chan []map[string]interface{}, 2)

test := make(chan []map[string]interface{}, 2)

// 启动一个 goroutine 用于重定向命令的标准输出
go redirectStdout(fn, stdout, mapchan, test)
go redirectStdout(fn, stdout, mapchan)

// 创建一个指向 prom_core.MyMetrics 类型的指针 metricsobj,
// 并使用结构体字段初始化 BPFName 和 Sqlinited。
metricsobj := &prom_core.MyMetrics{BPFName: b.Name, Sqlinited: false}

// 创建一个指向 dao.Sqlobj 类型的指针 sqlobj,
// 并使用结构体字段初始化 Tablename。
sqlobj := &dao.Sqlobj{Tablename: b.Name}
Expand Down Expand Up @@ -340,7 +340,7 @@ func listenSystemSignals(cmd *exec.Cmd) {
}

// 定义了一个名为 redirectStdout 的函数,用于读取 stdout,并将其解析成 map 数据发送到指定通道
func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]interface{}, test chan []map[string]interface{}) {
func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]interface{}) {
// 用于存储解析后的 map 数据的切片
var maps []map[string]interface{}
// 互斥锁,用于保护 maps 切片的并发写入
Expand All @@ -354,6 +354,7 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i
var syscall_titles []string
var ulock_titles []string
var kt_titles []string

// 行号计数器
var line_number = 1
// 命令索引,用于区分不同的命令
Expand All @@ -362,9 +363,19 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i
// RESOURCE(1)、SCHEDULE(2)、SYSCALL(3)、USERLOCK(4)、KEYTIME(5)
var data_type = 0
var set_map = 0
var enable_tgid = 0
// 1 代表已创建表头
data_map := [6]int{0, 0, 0, 0, 0, 0}

rsc_pidheader := make(map[int][]string)
sched_pidheader := make(map[int][]string)
syscall_pidheader := make(map[int][]string)
ulock_pidheader := make(map[int][]string)
kt_pidheader := make(map[int][]string)

var pid int
var err error

// proc_image
// 判断程序名是否为 proc_image
if fn == "proc_image" {
Expand All @@ -387,66 +398,186 @@ func redirectStdout(fn string, stdout io.ReadCloser, mapchan chan []map[string]i
data_type = 5
}
set_map = 1
enable_tgid = 0
} else if set_map == 1 {
// 对于表头行判断是否已经记录过表头,若没记录过则记录到相应的map中,设置 set_map 为 0
// 对于表头行判断是否已经记录过表头,若没记录过则记录到相应的map中
if data_map[data_type] == 0 {
for _, value := range fields {
switch data_type {
case 1:
rsc_titles = append(rsc_titles, value)
case 2:
sched_titles = append(sched_titles, value)
case 3:
syscall_titles = append(syscall_titles, value)
case 4:
ulock_titles = append(ulock_titles, value)
case 5:
kt_titles = append(kt_titles, value)
if fields[1] == "PID" {
for _, value := range fields[2:] {
switch data_type {
case 1:
rsc_titles = append(rsc_titles, value)
case 2:
sched_titles = append(sched_titles, value)
case 3:
syscall_titles = append(syscall_titles, value)
case 4:
ulock_titles = append(ulock_titles, value)
case 5:
kt_titles = append(kt_titles, value)
}
}
} else {
for _, value := range fields[3:] {
switch data_type {
case 1:
rsc_titles = append(rsc_titles, value)
case 2:
sched_titles = append(sched_titles, value)
case 3:
syscall_titles = append(syscall_titles, value)
case 4:
ulock_titles = append(ulock_titles, value)
case 5:
kt_titles = append(kt_titles, value)
}
}
enable_tgid = 1
}
data_map[data_type] = 1
}
set_map = 0
} else {
// 则根据 data_type 进行数据的记录(利用case语句)
if enable_tgid == 0 {
// pid 为 fields[1]
pid, err = strconv.Atoi(fields[1])
if err != nil {
// 处理转换错误
fmt.Println("Error:", err)
return
}
} else if enable_tgid == 1 {
// pid 为 fields[2]
pid, err = strconv.Atoi(fields[2])
if err != nil {
// 处理转换错误
fmt.Println("Error:", err)
return
}
}

switch data_type {
case 1:
if _, ok := rsc_pidheader[pid]; !ok {
if enable_tgid == 0 {
for _, title := range rsc_titles {
if enable_tgid == 0 {
rsc_pidheader[pid] = append(rsc_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title))
} else if enable_tgid == 1 {
rsc_pidheader[pid] = append(rsc_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title))
}
}
}
}
case 2:
if _, ok := sched_pidheader[pid]; !ok {
for _, title := range sched_titles {
if enable_tgid == 0 {
sched_pidheader[pid] = append(sched_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title))
} else if enable_tgid == 1 {
sched_pidheader[pid] = append(sched_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title))
}
}
}
case 3:
if _, ok := syscall_pidheader[pid]; !ok {
for _, title := range syscall_titles {
if enable_tgid == 0 {
syscall_pidheader[pid] = append(syscall_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title))
} else if enable_tgid == 1 {
syscall_pidheader[pid] = append(syscall_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title))
}
}
}
case 4:
if _, ok := ulock_pidheader[pid]; !ok {
for _, title := range ulock_titles {
if enable_tgid == 0 {
ulock_pidheader[pid] = append(ulock_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title))
} else if enable_tgid == 1 {
ulock_pidheader[pid] = append(ulock_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title))
}
}
}
case 5:
if _, ok := kt_pidheader[pid]; !ok {
for _, title := range kt_titles {
if enable_tgid == 0 {
kt_pidheader[pid] = append(kt_pidheader[pid], fmt.Sprintf("%s(%s)", fields[1], title))
} else if enable_tgid == 1 {
kt_pidheader[pid] = append(kt_pidheader[pid], fmt.Sprintf("%s_%s(%s)", fields[1], fields[2], title))
}
}
}
}

switch data_type {
case 1:
rsc_Map := make(map[string]interface{})
mu.Lock()
for i, value := range fields {
rsc_Map[rsc_titles[i]] = value
rsc_header := rsc_pidheader[pid]
if enable_tgid == 0 {
for i, value := range fields[2:] {
rsc_Map[rsc_header[i]] = value
}
} else if enable_tgid == 1 {
for i, value := range fields[3:] {
rsc_Map[rsc_header[i]] = value
}
}
mu.Unlock()
test <- []map[string]interface{}{rsc_Map}
mapchan <- []map[string]interface{}{rsc_Map}
case 2:
sched_Map := make(map[string]interface{})
mu.Lock()
for i, value := range fields {
sched_Map[sched_titles[i]] = value
sched_header := sched_pidheader[pid]
for i, value := range fields[2:] {
sched_Map[sched_header[i]] = value
}
mu.Unlock()
mapchan <- []map[string]interface{}{sched_Map}
case 3:
syscall_Map := make(map[string]interface{})
mu.Lock()
for i, value := range fields {
syscall_Map[syscall_titles[i]] = value
syscall_header := syscall_pidheader[pid]
if enable_tgid == 0 {
for i, value := range fields[2:] {
syscall_Map[syscall_header[i-2]] = value
}
} else if enable_tgid == 1 {
for i, value := range fields[3:] {
syscall_Map[syscall_header[i-3]] = value
}
}
mu.Unlock()
mapchan <- []map[string]interface{}{syscall_Map}
case 4:
ulock_Map := make(map[string]interface{})
mu.Lock()
for i, value := range fields {
ulock_Map[ulock_titles[i]] = value
ulock_header := ulock_pidheader[pid]
if enable_tgid == 0 {
for i, value := range fields[2:] {
ulock_Map[ulock_header[i-2]] = value
}
} else if enable_tgid == 1 {
for i, value := range fields[3:] {
ulock_Map[ulock_header[i-3]] = value
}
}
mu.Unlock()
mapchan <- []map[string]interface{}{ulock_Map}
case 5:
kt_Map := make(map[string]interface{})
mu.Lock()
for i, value := range fields {
kt_Map[kt_titles[i]] = value
kt_header := kt_pidheader[pid]
if enable_tgid == 0 {
for i, value := range fields[2:] {
kt_Map[kt_header[i-2]] = value
}
} else if enable_tgid == 1 {
for i, value := range fields[3:] {
kt_Map[kt_header[i-3]] = value
}
}
mu.Unlock()
mapchan <- []map[string]interface{}{kt_Map}
Expand Down
9 changes: 5 additions & 4 deletions eBPF_Visualization/eBPF_prometheus/dao/data_to_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@ package dao

import (
"fmt"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"log"
"os"
"strconv"

"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

// 定义一个名为 Sqlobj 的结构体类型,用于封装数据库相关的信息
type Sqlobj struct {
// Tablename 字段存储数据库表的名称。
Tablename string
// db 字段是一个指向 gorm.DB 类型的指针,用于处理与数据库交互的对象。
db *gorm.DB
db *gorm.DB
// Data 字段是一个 map,存储与数据库相关的信息。
Data map[string]interface{}
Data map[string]interface{}
}

type Basicdata struct {
Expand Down
22 changes: 14 additions & 8 deletions eBPF_Visualization/eBPF_prometheus/prom_core/processer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@ package prom_core
import (
"ebpf_prometheus/checker"
"ebpf_prometheus/dao"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
"strconv"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// 定义一个名为 MyMetrics 的结构体类型。
type MyMetrics struct {
// BPFName 字段存储与此度量相关的 BPF 的名称。
BPFName string
BPFName string
// mu 字段是一个互斥锁,用于在多协程之间同步对结构体字段的访问。
mu sync.Mutex
mu sync.Mutex
// Maps 字段是一个 map,存储与此度量相关的信息。
Maps map[string]interface{}
Maps map[string]interface{}
// Maplist 字段是一个切片,存储与此度量相关的信息的列表。
Maplist []map[string]interface{}
Maplist []map[string]interface{}
// Sqlobj 字段是一个指向 dao.Sqlobj 类型的指针,用于处理与数据库相关的信息。
Sqlobj *dao.Sqlobj
Sqlobj *dao.Sqlobj
// Sqlinited 字段表示与此度量相关的数据库是否已初始化。
Sqlinited bool
}
Expand Down Expand Up @@ -109,11 +110,16 @@ func (m *MyMetrics) Collect(ch chan<- prometheus.Metric) {
}
}

// StartService get map list chan and run a service to show metrics
// StartService 方法是 MyMetrics 类型的一个方法,用于启动服务并将 MyMetrics 注册到 Prometheus。
func (m *MyMetrics) StartService() {
// 使用 Prometheus 的 MustRegister 函数将 MyMetrics 注册到 Prometheus 收集器中。
prometheus.MustRegister(m)

// 将 /metrics 路径映射到 Prometheus HTTP 处理器,以便可以通过该路径访问指标数据。
http.Handle("/metrics", promhttp.Handler())

// 启动 HTTP 服务器,监听端口 8090,处理器为 nil(使用默认的多路复用器)。
// 如果启动失败,使用 log.Fatalf 输出错误信息并终止程序。
if err := http.ListenAndServe(":8090", nil); err != nil {
log.Fatalf("Failed to start HTTP server:", err)
}
Expand Down
Binary file added eBPF_Visualization/eBPF_prometheus/test
Binary file not shown.
25 changes: 25 additions & 0 deletions eBPF_Visualization/eBPF_prometheus/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import "fmt"

func main() {
// 创建一个通道,元素类型为 map[int][]map[string]float64
channelOfMaps := make(chan map[int][]map[string]float64, 2)

// 示例:向通道添加元素
element1 := make(map[int][]map[string]float64)
element1[1] = []map[string]float64{{"key1": 1.1, "key2": 2.2}, {"key3": 3.3, "key4": 4.4}, {"key3": 3.3, "key4": 4.4}}
channelOfMaps <- element1

element2 := make(map[int][]map[string]float64)
element2[2] = []map[string]float64{{"key5": 5.5, "key6": 6.6}, {"key7": 7.7, "key8": 8.8}}
channelOfMaps <- element2

// 示例:从通道接收元素
receivedElement1 := <-channelOfMaps
receivedElement2 := <-channelOfMaps

// 打印接收到的元素
fmt.Println("Received Element 1:", receivedElement1)
fmt.Println("Received Element 2:", receivedElement2)
}

0 comments on commit 4f51d41

Please sign in to comment.