From dc18b61eb7c43bc9a0a25f8e010c8a32264b3c54 Mon Sep 17 00:00:00 2001 From: Ehco1996 Date: Sat, 27 Jan 2024 16:26:30 +0800 Subject: [PATCH] Refactor web server and clash sub handling This commit refactors the web server implementation and improves the handling of clash subscriptions. It introduces a new `Server` struct in the `web` package, which encapsulates the Echo web framework and provides a cleaner interface for registering routes and middleware. The `NewServer` function now takes an additional `relayReloader` parameter, which allows the server to reload relay configurations when necessary. The `Config` struct in the `config` package has been enhanced to include a `cachedClashSubMap` field, which stores Clash subscription objects for efficient retrieval. The `GetClashSubList` method has been added to retrieve a list of Clash subscriptions from the configuration. Additionally, the `getOrCreateClashSub` method has been introduced to handle the creation and caching of Clash subscription objects. These changes improve the overall organization and maintainability of the codebase. --- internal/cli/config.go | 28 +++++---- internal/config/config.go | 42 ++++++++++++- internal/constant/constant.go | 9 ++- internal/relay/server.go | 113 +++++++++++++++++++--------------- internal/reloader/reloader.go | 9 +++ internal/web/const.go | 20 ------ internal/web/handlers.go | 51 ++++++--------- internal/web/metrics.go | 19 ++++++ internal/web/server.go | 20 ++++-- pkg/sub/clash.go | 41 +++++++++--- pkg/sub/clash_test.go | 2 +- 11 files changed, 223 insertions(+), 131 deletions(-) create mode 100644 internal/reloader/reloader.go delete mode 100644 internal/web/const.go diff --git a/internal/cli/config.go b/internal/cli/config.go index 46dee8312..02abcdb94 100644 --- a/internal/cli/config.go +++ b/internal/cli/config.go @@ -92,10 +92,23 @@ func InitConfigAndComponents() (*config.Config, error) { } func MustStartComponents(mainCtx context.Context, cfg *config.Config) { - cliLogger.Infof("Start ehco with version=%s", constant.Version) + cliLogger.Infof("Start ehco with version:%s", constant.Version) + + var rs *relay.Server + if cfg.NeedStartRelayServer() { + web.EhcoAlive.Set(web.EhcoAliveStateRunning) + s, err := relay.NewServer(cfg) + if err != nil { + cliLogger.Fatalf("NewRelayServer meet err=%s", err.Error()) + } + rs = s + go func() { + cliLogger.Fatalf("StartRelayServer meet err=%s", rs.Start(mainCtx)) + }() + } if cfg.NeedStartWebServer() { - webS, err := web.NewServer(cfg) + webS, err := web.NewServer(cfg, rs) if err != nil { cliLogger.Fatalf("NewWebServer meet err=%s", err.Error()) } @@ -103,6 +116,7 @@ func MustStartComponents(mainCtx context.Context, cfg *config.Config) { cliLogger.Fatalf("StartWebServer meet err=%s", webS.Start()) }() } + if cfg.NeedStartXrayServer() { xrayS := xray.NewXrayServer(cfg) if err := xrayS.Setup(); err != nil { @@ -112,14 +126,4 @@ func MustStartComponents(mainCtx context.Context, cfg *config.Config) { cliLogger.Fatalf("Start XrayServer meet err=%v", err) } } - if cfg.NeedStartRelayServer() { - web.EhcoAlive.Set(web.EhcoAliveStateRunning) - rs, err := relay.NewServer(cfg) - if err != nil { - cliLogger.Fatalf("NewRelayServer meet err=%s", err.Error()) - } - go func() { - cliLogger.Fatalf("StartRelayServer meet err=%s", rs.Start(mainCtx)) - }() - } } diff --git a/internal/config/config.go b/internal/config/config.go index 17494ff3d..a43a838ac 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Ehco1996/ehco/internal/relay/conf" + "github.com/Ehco1996/ehco/pkg/sub" xConf "github.com/xtls/xray-core/infra/conf" "go.uber.org/zap" ) @@ -31,10 +32,12 @@ type Config struct { lastLoadTime time.Time l *zap.SugaredLogger + + cachedClashSubMap map[string]*sub.ClashSub // key: clash sub name } func NewConfig(path string) *Config { - return &Config{PATH: path, l: zap.S().Named("cfg")} + return &Config{PATH: path, l: zap.S().Named("cfg"), cachedClashSubMap: make(map[string]*sub.ClashSub)} } func (c *Config) NeedSyncUserFromServer() bool { @@ -89,6 +92,19 @@ func (c *Config) Adjust() error { if c.WebHost == "" { c.WebHost = "0.0.0.0" } + + clashSubList, err := c.GetClashSubList() + if err != nil { + return err + } + for _, clashSub := range clashSubList { + relayConfigs, err := clashSub.ToRelayConfigsWithCache(c.WebHost) + if err != nil { + return err + } + c.RelayConfigs = append(c.RelayConfigs, relayConfigs...) + } + for _, r := range c.RelayConfigs { if err := r.Validate(); err != nil { return err @@ -120,6 +136,30 @@ func (c *Config) GetMetricURL() string { return url } +func (c *Config) GetClashSubList() ([]*sub.ClashSub, error) { + clashSubList := make([]*sub.ClashSub, 0, len(c.SubConfigs)) + for _, subCfg := range c.SubConfigs { + clashSub, err := c.getOrCreateClashSub(subCfg) + if err != nil { + return nil, err + } + clashSubList = append(clashSubList, clashSub) + } + return clashSubList, nil +} + +func (c *Config) getOrCreateClashSub(subCfg *SubConfig) (*sub.ClashSub, error) { + if clashSub, ok := c.cachedClashSubMap[subCfg.Name]; ok { + return clashSub, nil + } + clashSub, err := sub.NewClashSubByURL(subCfg.URL, subCfg.Name) + if err != nil { + return nil, err + } + c.cachedClashSubMap[subCfg.Name] = clashSub + return clashSub, nil +} + type SubConfig struct { Name string `json:"name"` URL string `json:"url"` diff --git a/internal/constant/constant.go b/internal/constant/constant.go index 459f8ba88..34fe0f9ca 100644 --- a/internal/constant/constant.go +++ b/internal/constant/constant.go @@ -11,7 +11,7 @@ var ( GitRevision string BuildTime string - IndexHTMLTMPL = ` + WelcomeHTML = ` @@ -24,10 +24,15 @@ var (

GitRevision: ` + GitRevision + `

BuildTime: ` + BuildTime + `


-

More information here

Metrics

Debug

Clash Proxy Provider

+ +
+ +
+
+

More information here

` diff --git a/internal/relay/server.go b/internal/relay/server.go index 78ea51610..3b801af56 100644 --- a/internal/relay/server.go +++ b/internal/relay/server.go @@ -12,9 +12,12 @@ import ( "time" "github.com/Ehco1996/ehco/internal/config" + "github.com/Ehco1996/ehco/internal/reloader" "go.uber.org/zap" ) +var _ reloader.Reloader = (*Server)(nil) + func inArray(ele string, array []string) bool { for _, v := range array { if v == ele { @@ -29,16 +32,18 @@ type Server struct { cfg *config.Config l *zap.SugaredLogger - errCH chan error // once error happen, server will exit + errCH chan error // once error happen, server will exit + reloadCH chan struct{} // reload config } func NewServer(cfg *config.Config) (*Server, error) { l := zap.S().Named("relay-server") s := &Server{ - cfg: cfg, - l: l, - relayM: &sync.Map{}, - errCH: make(chan error, 1), + cfg: cfg, + l: l, + relayM: &sync.Map{}, + errCH: make(chan error, 1), + reloadCH: make(chan struct{}, 1), } return s, nil } @@ -68,7 +73,7 @@ func (s *Server) Start(ctx context.Context) error { go s.startOneRelay(r) } - if s.cfg.PATH != "" && s.cfg.ReloadInterval > 0 { + if s.cfg.PATH != "" && (s.cfg.ReloadInterval > 0 || len(s.cfg.SubConfigs) > 0) { s.l.Infof("Start to watch relay config %s ", s.cfg.PATH) go s.WatchAndReload(ctx) } @@ -89,38 +94,32 @@ func (s *Server) Start(ctx context.Context) error { } func (s *Server) Reload() error { - // load new config + // load config on raw + // NOTE: this is for reuse cached clash sub, because clash sub to relay config will change port every time when call if err := s.cfg.LoadConfig(); err != nil { - s.l.Errorf("Reload conf meet error: %s ", err) + s.l.Error("load new cfg meet error", zap.Error(err)) return err } - var newRelayAddrList []string + var allRelayAddrList []string for idx := range s.cfg.RelayConfigs { r, err := NewRelay(s.cfg.RelayConfigs[idx]) if err != nil { s.l.Errorf("reload new relay failed err=%s", err.Error()) return err } - newRelayAddrList = append(newRelayAddrList, r.Name) - // reload relay when name change - if oldR, ok := s.relayM.Load(r.Name); ok { - oldR := oldR.(*Relay) - if oldR.Name != r.Name { - s.l.Warnf("close old relay name=%s", oldR.Name) - s.stopOneRelay(oldR) - go s.startOneRelay(r) - } - continue // no need to reload - } + allRelayAddrList = append(allRelayAddrList, r.Name) // start bread new relay that not in old relayM - s.l.Infof("start new relay name=%s", r.Name) - go s.startOneRelay(r) + if _, ok := s.relayM.Load(r.Name); !ok { + s.l.Infof("start new relay name=%s", r.Name) + go s.startOneRelay(r) + continue + } } - // closed relay not in new config + // closed relay not in all relay list s.relayM.Range(func(key, value interface{}) bool { oldAddr := key.(string) - if !inArray(oldAddr, newRelayAddrList) { + if !inArray(oldAddr, allRelayAddrList) { v, _ := s.relayM.Load(oldAddr) oldR := v.(*Relay) s.stopOneRelay(oldR) @@ -130,46 +129,64 @@ func (s *Server) Reload() error { return nil } +func (s *Server) Stop() error { + s.l.Info("relay server stop now") + s.relayM.Range(func(key, value interface{}) bool { + r := value.(*Relay) + r.Close() + return true + }) + return nil +} + +func (s *Server) TriggerReload() { + s.reloadCH <- struct{}{} +} + func (s *Server) WatchAndReload(ctx context.Context) { - reloadCH := make(chan struct{}, 1) - // listen syscall.SIGHUP to trigger reload - sigHubCH := make(chan os.Signal, 1) - signal.Notify(sigHubCH, syscall.SIGHUP) - go func() { - for { - select { - case <-ctx.Done(): - return - case <-sigHubCH: - s.l.Info("Now Reloading Relay Conf By HUP Signal! ") - reloadCH <- struct{}{} + go s.TriggerReloadBySignal(ctx) + go s.triggerReloadByTicker(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-s.reloadCH: + if err := s.Reload(); err != nil { + s.l.Errorf("Reloading Relay Conf meet error: %s ", err) + s.errCH <- err } } - }() + } +} - // ticker to reload config - ticker := time.NewTicker(time.Second * time.Duration(s.cfg.ReloadInterval)) - defer ticker.Stop() - go func() { +func (s *Server) triggerReloadByTicker(ctx context.Context) { + if s.cfg.ReloadInterval > 0 { + ticker := time.NewTicker(time.Second * time.Duration(s.cfg.ReloadInterval)) + defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - reloadCH <- struct{}{} + s.l.Info("Now Reloading Relay Conf By ticker! ") + s.TriggerReload() } } - }() + } +} +func (s *Server) TriggerReloadBySignal(ctx context.Context) { + // listen syscall.SIGHUP to trigger reload + sigHubCH := make(chan os.Signal, 1) + signal.Notify(sigHubCH, syscall.SIGHUP) for { select { case <-ctx.Done(): return - case <-reloadCH: - if err := s.Reload(); err != nil { - s.l.Errorf("Reloading Relay Conf meet error: %s ", err) - s.errCH <- err - } + case <-sigHubCH: + s.l.Info("Now Reloading Relay Conf By HUP Signal! ") + s.TriggerReload() } } } diff --git a/internal/reloader/reloader.go b/internal/reloader/reloader.go new file mode 100644 index 000000000..b9a8b8ab8 --- /dev/null +++ b/internal/reloader/reloader.go @@ -0,0 +1,9 @@ +package reloader + +import "context" + +type Reloader interface { + Reload() error + WatchAndReload(ctx context.Context) + TriggerReload() +} diff --git a/internal/web/const.go b/internal/web/const.go deleted file mode 100644 index 43c9ec728..000000000 --- a/internal/web/const.go +++ /dev/null @@ -1,20 +0,0 @@ -package web - -const ( - METRIC_NS = "ehco" - METRIC_SUBSYSTEM_TRAFFIC = "traffic" - METRIC_SUBSYSTEM_PING = "ping" - - METRIC_LABEL_REMOTE = "remote" - - METRIC_LABEL_CONN_FLOW = "flow" - METRIC_CONN_FLOW_WRITE = "write" - METRIC_CONN_FLOW_READ = "read" - - METRIC_LABEL_CONN_TYPE = "type" - METRIC_CONN_TYPE_TCP = "tcp" - METRIC_CONN_TYPE_UDP = "udp" - - EhcoAliveStateInit = 0 - EhcoAliveStateRunning = 1 -) diff --git a/internal/web/handlers.go b/internal/web/handlers.go index 3857d4f1a..8c45ddab9 100644 --- a/internal/web/handlers.go +++ b/internal/web/handlers.go @@ -2,12 +2,9 @@ package web import ( "fmt" - "io" "net/http" - "github.com/Ehco1996/ehco/internal/config" "github.com/Ehco1996/ehco/internal/constant" - "github.com/Ehco1996/ehco/pkg/sub" "go.uber.org/zap" ) @@ -19,7 +16,7 @@ func MakeIndexF() http.HandlerFunc { } func welcome(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, constant.IndexHTMLTMPL) + fmt.Fprintf(w, constant.WelcomeHTML) } func writerBadRequestMsg(w http.ResponseWriter, msg string) { @@ -34,8 +31,13 @@ func (s *Server) HandleClashProxyProvider(w http.ResponseWriter, r *http.Request writerBadRequestMsg(w, msg) return } + if s.relayServerReloader != nil { + s.relayServerReloader.TriggerReload() + } else { + s.l.Debugf("relayServerReloader is nil this should not happen") + } - clashSubList, err := refreshClashProxyProvider(s.cfg) + clashSubList, err := s.cfg.GetClashSubList() if err != nil { writerBadRequestMsg(w, err.Error()) return @@ -47,7 +49,7 @@ func (s *Server) HandleClashProxyProvider(w http.ResponseWriter, r *http.Request writerBadRequestMsg(w, err.Error()) return } - // todo refresh relay config and restart relay + _, err = w.Write(clashCfgBuf) if err != nil { s.l.Errorf("write response meet err=%v", err) @@ -60,32 +62,17 @@ func (s *Server) HandleClashProxyProvider(w http.ResponseWriter, r *http.Request writerBadRequestMsg(w, msg) } -func refreshClashProxyProvider(cfg *config.Config) ([]*sub.ClashSub, error) { - clashSubList := make([]*sub.ClashSub, 0, len(cfg.SubConfigs)) - for _, subCfg := range cfg.SubConfigs { - resp, err := http.Get(subCfg.URL) - if err != nil { - msg := fmt.Sprintf("http get sub config url=%s meet err=%v", subCfg.URL, err) - return nil, fmt.Errorf(msg) - - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - msg := fmt.Sprintf("http get sub config url=%s meet status code=%d", subCfg.URL, resp.StatusCode) - return nil, fmt.Errorf(msg) +func (s *Server) HandleReload(w http.ResponseWriter, r *http.Request) { + if s.relayServerReloader == nil { + writerBadRequestMsg(w, "reload not support") + return + } - } - body, err := io.ReadAll(resp.Body) - if err != nil { - msg := fmt.Sprintf("read body meet err=%v", err) - return nil, fmt.Errorf(msg) - } - clashSub, err := sub.NewClashSub(body, subCfg.Name) - if err != nil { - msg := fmt.Sprintf("NewClashSub meet err=%v", err) - return nil, fmt.Errorf(msg) - } - clashSubList = append(clashSubList, clashSub) + s.relayServerReloader.TriggerReload() + _, err := w.Write([]byte("reload success")) + if err != nil { + s.l.Errorf("write response meet err=%v", err) + writerBadRequestMsg(w, err.Error()) + return } - return clashSubList, nil } diff --git a/internal/web/metrics.go b/internal/web/metrics.go index d84c8e61d..1c8e51f7d 100644 --- a/internal/web/metrics.go +++ b/internal/web/metrics.go @@ -18,6 +18,25 @@ import ( "go.uber.org/zap" ) +const ( + METRIC_NS = "ehco" + METRIC_SUBSYSTEM_TRAFFIC = "traffic" + METRIC_SUBSYSTEM_PING = "ping" + + METRIC_LABEL_REMOTE = "remote" + + METRIC_LABEL_CONN_FLOW = "flow" + METRIC_CONN_FLOW_WRITE = "write" + METRIC_CONN_FLOW_READ = "read" + + METRIC_LABEL_CONN_TYPE = "type" + METRIC_CONN_TYPE_TCP = "tcp" + METRIC_CONN_TYPE_UDP = "udp" + + EhcoAliveStateInit = 0 + EhcoAliveStateRunning = 1 +) + // ping metrics var ( pingLabelNames = []string{"ip", "host", "label"} diff --git a/internal/web/server.go b/internal/web/server.go index dd7cbb4e7..fa0ab7d2d 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -6,21 +6,24 @@ import ( "net/http" _ "net/http/pprof" - "github.com/Ehco1996/ehco/internal/config" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" + + "github.com/Ehco1996/ehco/internal/config" + "github.com/Ehco1996/ehco/internal/reloader" ) type Server struct { e *echo.Echo addr string l *zap.SugaredLogger + cfg *config.Config - cfg *config.Config + relayServerReloader reloader.Reloader } -func NewServer(cfg *config.Config) (*Server, error) { +func NewServer(cfg *config.Config, relayReloader reloader.Reloader) (*Server, error) { l := zap.S().Named("web") addr := net.JoinHostPort(cfg.WebHost, fmt.Sprintf("%d", cfg.WebPort)) @@ -32,20 +35,27 @@ func NewServer(cfg *config.Config) (*Server, error) { if cfg.WebToken != "" { e.Use(SimpleTokenAuthMiddleware(cfg.WebToken, l)) } - if err := registerEhcoMetrics(cfg); err != nil { return nil, err } if err := registerNodeExporterMetrics(cfg); err != nil { return nil, err } - s := &Server{e: e, addr: addr, l: l, cfg: cfg} + s := &Server{ + e: e, + addr: addr, + l: l, + cfg: cfg, + relayServerReloader: relayReloader, + } // register handler e.GET("/", echo.WrapHandler(http.HandlerFunc(welcome))) e.GET("/metrics/", echo.WrapHandler(promhttp.Handler())) e.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) e.GET("/clash_proxy_provider/", echo.WrapHandler(http.HandlerFunc(s.HandleClashProxyProvider))) + + e.POST("/reload/", echo.WrapHandler(http.HandlerFunc(s.HandleReload))) return s, nil } diff --git a/pkg/sub/clash.go b/pkg/sub/clash.go index 09a22e4a2..995f9a80b 100644 --- a/pkg/sub/clash.go +++ b/pkg/sub/clash.go @@ -2,7 +2,9 @@ package sub import ( "fmt" + "io" "net" + "net/http" "strconv" "github.com/Ehco1996/ehco/internal/constant" @@ -13,8 +15,9 @@ import ( type ClashSub struct { Name string - raw *ClashConfig - after *ClashConfig + raw *ClashConfig + after *ClashConfig + relayConfigs []*relay_cfg.Config } @@ -24,21 +27,40 @@ func NewClashSub(rawClashCfgBuf []byte, name string) (*ClashSub, error) { if err != nil { return nil, err } - // do a copy for raw - after := raw - return &ClashSub{raw: &raw, after: &after, Name: name}, nil + return &ClashSub{raw: &raw, Name: name}, nil +} + +func NewClashSubByURL(url string, name string) (*ClashSub, error) { + resp, err := http.Get(url) + if err != nil { + msg := fmt.Sprintf("http get sub config url=%s meet err=%v", url, err) + return nil, fmt.Errorf(msg) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + msg := fmt.Sprintf("http get sub config url=%s meet status code=%d", url, resp.StatusCode) + return nil, fmt.Errorf(msg) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + msg := fmt.Sprintf("read body meet err=%v", err) + return nil, fmt.Errorf(msg) + } + return NewClashSub(body, name) } func (c *ClashSub) ToClashConfigYaml() ([]byte, error) { return yaml.Marshal(c.after) } -func (c *ClashSub) ToRelayConfigs(listenHost string) ([]*relay_cfg.Config, error) { +func (c *ClashSub) ToRelayConfigsWithCache(listenHost string) ([]*relay_cfg.Config, error) { if len(c.relayConfigs) > 0 { return c.relayConfigs, nil } + // do a copy for raw + after := c.raw + c.after = after - relayCfg := make([]*relay_cfg.Config, 0, len(c.raw.Proxies)) freePorts, err := getFreePortInBatch(listenHost, len(c.raw.Proxies)) if err != nil { return nil, err @@ -61,11 +83,10 @@ func (c *ClashSub) ToRelayConfigs(listenHost string) ([]*relay_cfg.Config, error if err := r.Validate(); err != nil { return nil, err } - relayCfg = append(relayCfg, r) + c.relayConfigs = append(c.relayConfigs, r) c.after.Proxies[idx].Server = listenHost c.after.Proxies[idx].Port = strconv.Itoa(listenPort) c.after.Proxies[idx].Name = fmt.Sprintf("%s-%s", c.Name, proxy.Name) } - c.relayConfigs = relayCfg - return relayCfg, nil + return c.relayConfigs, nil } diff --git a/pkg/sub/clash_test.go b/pkg/sub/clash_test.go index f3291ff2f..71229f7c8 100644 --- a/pkg/sub/clash_test.go +++ b/pkg/sub/clash_test.go @@ -54,7 +54,7 @@ func TestToRelayConfigs(t *testing.T) { assert.NoError(t, err, "NewConfig should not retur an error") assert.NotNil(t, cs, "Config should not be nil") - relayConfigs, err := cs.ToRelayConfigs("localhost") + relayConfigs, err := cs.ToRelayConfigsWithCache("localhost") assert.NoError(t, err, "ToRelayConfigs should not return an error") assert.NotNil(t, relayConfigs, "relayConfigs should not be nil") expectedRelayCount := 2