Skip to content

Commit

Permalink
xray: add traffic stats in debug logger
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Oct 17, 2023
1 parent cc0ccdf commit 40b5705
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
3 changes: 2 additions & 1 deletion internal/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ func registerNodeExporterMetrics(cfg *config.Config) error {
promlogConfig := &promlog.Config{Level: level}
logger := promlog.New(promlogConfig)
// see this https://github.com/prometheus/node_exporter/pull/2463
if _, err := kingpin.CommandLine.Parse([]string{}); err != nil {
if _, err := kingpin.CommandLine.Parse([]string{"--collector.boottime"}); err != nil {
return err
}
collector.DisableDefaultCollectors()
nc, err := collector.NewNodeCollector(logger)
if err != nil {
return fmt.Errorf("couldn't create collector: %s", err)
Expand Down
10 changes: 6 additions & 4 deletions pkg/limiter/limiter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package limiter

import (
"fmt"
"sync"
"time"

"go.uber.org/zap"
"golang.org/x/time/rate"
)

Expand All @@ -23,16 +23,19 @@ type IPRateLimiter struct {
burst int // 表示桶容量大小,即同一时刻能取到的最大token数量

lastGcTime time.Time // 上次gc的时间

logger *zap.Logger
}

// NewIPRateLimiter .
func NewIPRateLimiter(limit rate.Limit, burst int) *IPRateLimiter {
func NewIPRateLimiter(limit rate.Limit, burst int, logger *zap.Logger) *IPRateLimiter {
i := &IPRateLimiter{
previousRateM: make(map[string]*rate.Limiter),
currentRateM: make(map[string]*rate.Limiter),
limit: limit,
burst: burst,
lastGcTime: time.Now(),
logger: logger,
}
return i
}
Expand Down Expand Up @@ -70,8 +73,7 @@ func (i *IPRateLimiter) gc() {
i.Lock()
defer i.Unlock()
now := time.Now()
// todo refine this logger
fmt.Printf("[IPRateLimiter] gc alive count=%d\n", len(i.currentRateM))
i.logger.Info("[IPRateLimiter] gc start", zap.Int("alive count", len(i.currentRateM)))
i.lastGcTime = now
i.previousRateM = i.currentRateM
i.currentRateM = make(map[string]*rate.Limiter)
Expand Down
3 changes: 2 additions & 1 deletion pkg/limiter/limtier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"testing"
"time"

"go.uber.org/zap"
"golang.org/x/time/rate"
)

func TestIPRateLimiter_CanServe(t *testing.T) {
ipr := NewIPRateLimiter(rate.Limit(1), 1) // 1/s 处理一个请求
ipr := NewIPRateLimiter(rate.Limit(1), 1, zap.NewExample()) // 1/s 处理一个请求

ip1 := "1.1.1.1"
ip2 := "1.2.2.2"
Expand Down
14 changes: 8 additions & 6 deletions pkg/xray/bandwidth_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func NewBandwidthRecorder(metricsURL string) *bandwidthRecorder {
}
}

func (b *bandwidthRecorder) RecordOnce(ctx context.Context) error {
func (b *bandwidthRecorder) RecordOnce(ctx context.Context) (uploadIncr float64, downloadIncr float64, err error) {
response, err := b.httpClient.Get(b.metricsURL)
if err != nil {
return err
return
}
defer response.Body.Close()

body, err := io.ReadAll(response.Body)
if err != nil {
return err
return
}
lines := strings.Split(string(body), "\n")

Expand Down Expand Up @@ -73,13 +73,15 @@ func (b *bandwidthRecorder) RecordOnce(ctx context.Context) error {
if !b.lastRecordTime.IsZero() {
// calculate bandwidth
elapsed := now.Sub(b.lastRecordTime).Seconds()
b.uploadBandwidthBytes = (send - b.currentSendBytes) / elapsed
b.downloadBandwidthBytes = (recv - b.currentRecvBytes) / elapsed
uploadIncr = (send - b.currentSendBytes)
downloadIncr = (recv - b.currentRecvBytes)
b.uploadBandwidthBytes = uploadIncr / elapsed
b.downloadBandwidthBytes = downloadIncr / elapsed
}
b.lastRecordTime = now
b.currentRecvBytes = recv
b.currentSendBytes = send
return nil
return
}

func parseFloat(s string) float64 {
Expand Down
20 changes: 17 additions & 3 deletions pkg/xray/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type SyncTrafficReq struct {
DownloadBandwidth int64 `json:"download_bandwidth"`
}

func (s *SyncTrafficReq) GetTotalTraffic() int64 {
var total int64
for _, u := range s.Data {
total += u.UploadTraffic + u.DownloadTraffic
}
return total
}

type SyncUserConfigsResp struct {
Users []*User `json:"users"`
}
Expand Down Expand Up @@ -217,16 +225,22 @@ func (up *UserPool) syncTrafficToServer(ctx context.Context, endpoint, tag strin
req := &SyncTrafficReq{Data: tfs}
if up.br != nil {
// record bandwidth
if err := up.br.RecordOnce(ctx); err != nil {
uploadIncr, downloadIncr, err := up.br.RecordOnce(ctx)
if err != nil {
return err
}

ub := up.br.GetUploadBandwidth()
req.UploadBandwidth = int64(ub)
db := up.br.GetDownloadBandwidth()
req.DownloadBandwidth = int64(db)
l.Debug("Upload Bandwidth :", PrettyByteSize(ub),
"Download Bandwidth :", PrettyByteSize(db))
l.Debug(
"Upload Bandwidth :", PrettyByteSize(ub),
"Download Bandwidth :", PrettyByteSize(db),
"Total Bandwidth :", PrettyByteSize(ub+db),
"Total Increment By BR", PrettyByteSize(uploadIncr+downloadIncr),
"Total Increment By Xray :", PrettyByteSize(float64(req.GetTotalTraffic())),
)
}
if err := postJson(up.httpClient, endpoint, req); err != nil {
return err
Expand Down

0 comments on commit 40b5705

Please sign in to comment.