Skip to content

Commit

Permalink
Refactor web server and clash sub handling
Browse files Browse the repository at this point in the history
This commit refactors the web server implementation and improves the handling of clash subscriptions. It introduces a new `Server` struct in the `web` package, which encapsulates the Echo web framework and provides a cleaner interface for registering routes and middleware. The `NewServer` function now takes an additional `relayReloader` parameter, which allows the server to reload relay configurations when necessary.

The `Config` struct in the `config` package has been enhanced to include a `cachedClashSubMap` field, which stores Clash subscription objects for efficient retrieval. The `GetClashSubList` method has been added to retrieve a list of Clash subscriptions from the configuration. Additionally, the `getOrCreateClashSub` method has been introduced to handle the creation and caching of Clash subscription objects.

These changes improve the overall organization and maintainability of the codebase.
  • Loading branch information
Ehco1996 committed Jan 27, 2024
1 parent 026a9a4 commit dc18b61
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 131 deletions.
28 changes: 16 additions & 12 deletions internal/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,31 @@ func InitConfigAndComponents() (*config.Config, error) {
}

func MustStartComponents(mainCtx context.Context, cfg *config.Config) {
cliLogger.Infof("Start ehco with version=%s", constant.Version)
cliLogger.Infof("Start ehco with version:%s", constant.Version)

var rs *relay.Server
if cfg.NeedStartRelayServer() {
web.EhcoAlive.Set(web.EhcoAliveStateRunning)
s, err := relay.NewServer(cfg)
if err != nil {
cliLogger.Fatalf("NewRelayServer meet err=%s", err.Error())
}
rs = s
go func() {
cliLogger.Fatalf("StartRelayServer meet err=%s", rs.Start(mainCtx))
}()
}

if cfg.NeedStartWebServer() {
webS, err := web.NewServer(cfg)
webS, err := web.NewServer(cfg, rs)
if err != nil {
cliLogger.Fatalf("NewWebServer meet err=%s", err.Error())
}
go func() {
cliLogger.Fatalf("StartWebServer meet err=%s", webS.Start())
}()
}

if cfg.NeedStartXrayServer() {
xrayS := xray.NewXrayServer(cfg)
if err := xrayS.Setup(); err != nil {
Expand All @@ -112,14 +126,4 @@ func MustStartComponents(mainCtx context.Context, cfg *config.Config) {
cliLogger.Fatalf("Start XrayServer meet err=%v", err)
}
}
if cfg.NeedStartRelayServer() {
web.EhcoAlive.Set(web.EhcoAliveStateRunning)
rs, err := relay.NewServer(cfg)
if err != nil {
cliLogger.Fatalf("NewRelayServer meet err=%s", err.Error())
}
go func() {
cliLogger.Fatalf("StartRelayServer meet err=%s", rs.Start(mainCtx))
}()
}
}
42 changes: 41 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/Ehco1996/ehco/internal/relay/conf"
"github.com/Ehco1996/ehco/pkg/sub"
xConf "github.com/xtls/xray-core/infra/conf"
"go.uber.org/zap"
)
Expand All @@ -31,10 +32,12 @@ type Config struct {

lastLoadTime time.Time
l *zap.SugaredLogger

cachedClashSubMap map[string]*sub.ClashSub // key: clash sub name
}

func NewConfig(path string) *Config {
return &Config{PATH: path, l: zap.S().Named("cfg")}
return &Config{PATH: path, l: zap.S().Named("cfg"), cachedClashSubMap: make(map[string]*sub.ClashSub)}
}

func (c *Config) NeedSyncUserFromServer() bool {
Expand Down Expand Up @@ -89,6 +92,19 @@ func (c *Config) Adjust() error {
if c.WebHost == "" {
c.WebHost = "0.0.0.0"
}

clashSubList, err := c.GetClashSubList()
if err != nil {
return err
}
for _, clashSub := range clashSubList {
relayConfigs, err := clashSub.ToRelayConfigsWithCache(c.WebHost)
if err != nil {
return err
}
c.RelayConfigs = append(c.RelayConfigs, relayConfigs...)
}

for _, r := range c.RelayConfigs {
if err := r.Validate(); err != nil {
return err
Expand Down Expand Up @@ -120,6 +136,30 @@ func (c *Config) GetMetricURL() string {
return url
}

func (c *Config) GetClashSubList() ([]*sub.ClashSub, error) {
clashSubList := make([]*sub.ClashSub, 0, len(c.SubConfigs))
for _, subCfg := range c.SubConfigs {
clashSub, err := c.getOrCreateClashSub(subCfg)
if err != nil {
return nil, err
}
clashSubList = append(clashSubList, clashSub)
}
return clashSubList, nil
}

func (c *Config) getOrCreateClashSub(subCfg *SubConfig) (*sub.ClashSub, error) {
if clashSub, ok := c.cachedClashSubMap[subCfg.Name]; ok {
return clashSub, nil
}
clashSub, err := sub.NewClashSubByURL(subCfg.URL, subCfg.Name)
if err != nil {
return nil, err
}
c.cachedClashSubMap[subCfg.Name] = clashSub
return clashSub, nil
}

type SubConfig struct {
Name string `json:"name"`
URL string `json:"url"`
Expand Down
9 changes: 7 additions & 2 deletions internal/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var (
GitRevision string
BuildTime string

IndexHTMLTMPL = `<!doctype html>
WelcomeHTML = `<!doctype html>
<html>
<head>
<meta charset="UTF-8">
Expand All @@ -24,10 +24,15 @@ var (
<h3>GitRevision: ` + GitRevision + `</h3>
<h3>BuildTime: ` + BuildTime + `</h3>
<hr>
<p><a href="https://github.com/Ehco1996/ehco">More information here</a></p>
<p><a href="/metrics/">Metrics</a></p>
<p><a href="/debug/pprof/">Debug</a></p>
<p><a href="/clash_proxy_provider/?sub_name=<your_sub_name>">Clash Proxy Provider</a></p>
<form action="/reload/" method="post">
<input type="submit" value="Reload Config">
</form>
<hr>
<p><a href="https://github.com/Ehco1996/ehco">More information here</a></p>
</body>
</html>
`
Expand Down
113 changes: 65 additions & 48 deletions internal/relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"time"

"github.com/Ehco1996/ehco/internal/config"
"github.com/Ehco1996/ehco/internal/reloader"
"go.uber.org/zap"
)

var _ reloader.Reloader = (*Server)(nil)

func inArray(ele string, array []string) bool {
for _, v := range array {
if v == ele {
Expand All @@ -29,16 +32,18 @@ type Server struct {
cfg *config.Config
l *zap.SugaredLogger

errCH chan error // once error happen, server will exit
errCH chan error // once error happen, server will exit
reloadCH chan struct{} // reload config
}

func NewServer(cfg *config.Config) (*Server, error) {
l := zap.S().Named("relay-server")
s := &Server{
cfg: cfg,
l: l,
relayM: &sync.Map{},
errCH: make(chan error, 1),
cfg: cfg,
l: l,
relayM: &sync.Map{},
errCH: make(chan error, 1),
reloadCH: make(chan struct{}, 1),
}
return s, nil
}
Expand Down Expand Up @@ -68,7 +73,7 @@ func (s *Server) Start(ctx context.Context) error {
go s.startOneRelay(r)
}

if s.cfg.PATH != "" && s.cfg.ReloadInterval > 0 {
if s.cfg.PATH != "" && (s.cfg.ReloadInterval > 0 || len(s.cfg.SubConfigs) > 0) {
s.l.Infof("Start to watch relay config %s ", s.cfg.PATH)
go s.WatchAndReload(ctx)
}
Expand All @@ -89,38 +94,32 @@ func (s *Server) Start(ctx context.Context) error {
}

func (s *Server) Reload() error {
// load new config
// load config on raw
// NOTE: this is for reuse cached clash sub, because clash sub to relay config will change port every time when call
if err := s.cfg.LoadConfig(); err != nil {
s.l.Errorf("Reload conf meet error: %s ", err)
s.l.Error("load new cfg meet error", zap.Error(err))
return err
}
var newRelayAddrList []string
var allRelayAddrList []string
for idx := range s.cfg.RelayConfigs {
r, err := NewRelay(s.cfg.RelayConfigs[idx])
if err != nil {
s.l.Errorf("reload new relay failed err=%s", err.Error())
return err
}
newRelayAddrList = append(newRelayAddrList, r.Name)
// reload relay when name change
if oldR, ok := s.relayM.Load(r.Name); ok {
oldR := oldR.(*Relay)
if oldR.Name != r.Name {
s.l.Warnf("close old relay name=%s", oldR.Name)
s.stopOneRelay(oldR)
go s.startOneRelay(r)
}
continue // no need to reload
}
allRelayAddrList = append(allRelayAddrList, r.Name)
// start bread new relay that not in old relayM
s.l.Infof("start new relay name=%s", r.Name)
go s.startOneRelay(r)
if _, ok := s.relayM.Load(r.Name); !ok {
s.l.Infof("start new relay name=%s", r.Name)
go s.startOneRelay(r)
continue
}
}

// closed relay not in new config
// closed relay not in all relay list
s.relayM.Range(func(key, value interface{}) bool {
oldAddr := key.(string)
if !inArray(oldAddr, newRelayAddrList) {
if !inArray(oldAddr, allRelayAddrList) {
v, _ := s.relayM.Load(oldAddr)
oldR := v.(*Relay)
s.stopOneRelay(oldR)
Expand All @@ -130,46 +129,64 @@ func (s *Server) Reload() error {
return nil
}

func (s *Server) Stop() error {
s.l.Info("relay server stop now")
s.relayM.Range(func(key, value interface{}) bool {
r := value.(*Relay)
r.Close()
return true
})
return nil
}

func (s *Server) TriggerReload() {
s.reloadCH <- struct{}{}
}

func (s *Server) WatchAndReload(ctx context.Context) {
reloadCH := make(chan struct{}, 1)
// listen syscall.SIGHUP to trigger reload
sigHubCH := make(chan os.Signal, 1)
signal.Notify(sigHubCH, syscall.SIGHUP)
go func() {
for {
select {
case <-ctx.Done():
return
case <-sigHubCH:
s.l.Info("Now Reloading Relay Conf By HUP Signal! ")
reloadCH <- struct{}{}
go s.TriggerReloadBySignal(ctx)
go s.triggerReloadByTicker(ctx)

for {
select {
case <-ctx.Done():
return
case <-s.reloadCH:
if err := s.Reload(); err != nil {
s.l.Errorf("Reloading Relay Conf meet error: %s ", err)
s.errCH <- err
}
}
}()
}
}

// ticker to reload config
ticker := time.NewTicker(time.Second * time.Duration(s.cfg.ReloadInterval))
defer ticker.Stop()
go func() {
func (s *Server) triggerReloadByTicker(ctx context.Context) {
if s.cfg.ReloadInterval > 0 {
ticker := time.NewTicker(time.Second * time.Duration(s.cfg.ReloadInterval))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reloadCH <- struct{}{}
s.l.Info("Now Reloading Relay Conf By ticker! ")
s.TriggerReload()
}
}
}()
}
}

func (s *Server) TriggerReloadBySignal(ctx context.Context) {
// listen syscall.SIGHUP to trigger reload
sigHubCH := make(chan os.Signal, 1)
signal.Notify(sigHubCH, syscall.SIGHUP)
for {
select {
case <-ctx.Done():
return
case <-reloadCH:
if err := s.Reload(); err != nil {
s.l.Errorf("Reloading Relay Conf meet error: %s ", err)
s.errCH <- err
}
case <-sigHubCH:
s.l.Info("Now Reloading Relay Conf By HUP Signal! ")
s.TriggerReload()
}
}
}
9 changes: 9 additions & 0 deletions internal/reloader/reloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package reloader

import "context"

type Reloader interface {
Reload() error
WatchAndReload(ctx context.Context)
TriggerReload()
}
20 changes: 0 additions & 20 deletions internal/web/const.go

This file was deleted.

Loading

0 comments on commit dc18b61

Please sign in to comment.