diff --git a/backend/state-manager/cmd/main.go b/backend/state-manager/cmd/main.go index 13b5e3c..b38ba5d 100644 --- a/backend/state-manager/cmd/main.go +++ b/backend/state-manager/cmd/main.go @@ -10,7 +10,6 @@ import ( "os/signal" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -19,10 +18,8 @@ import ( "github.com/ueckoken/plarail2023/backend/spec/state/v1/statev1connect" connectHandler "github.com/ueckoken/plarail2023/backend/state-manager/pkg/connect" "github.com/ueckoken/plarail2023/backend/state-manager/pkg/db" - "github.com/ueckoken/plarail2023/backend/state-manager/pkg/db" "github.com/ueckoken/plarail2023/backend/state-manager/pkg/mqtt_handler" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/options" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sync/errgroup" @@ -95,7 +92,6 @@ func main() { go func() { <-ctx.Done() slog.Default().Info("signal received or canceled") - slog.Default().Info("signal received or canceled") }() eg, ctx := errgroup.WithContext(ctx) @@ -124,36 +120,11 @@ func main() { return mqttHandler.Start(ctx) }) - DBOpts := options.Client().ApplyURI(os.Getenv("MONGODB_URI")) - DBHandler, err := db.Open(ctx, DBOpts) - if err != nil { - slog.Default().Error("database connection failed", slog.Any("err", err)) - cancel() - return - } - mqttClientOpts := mqtt.NewClientOptions() - mqttClientOpts.AddBroker(os.Getenv("MQTT_BROKER_ADDR")) - mqttClientOpts.Username = os.Getenv("MQTT_USERNAME") - mqttClientOpts.Password = os.Getenv("MQTT_PASSWORD") - mqttClientOpts.ClientID = os.Getenv("MQTT_CLIENT_ID") - - mqttHandler, err := mqtt_handler.NewHandler(mqttClientOpts, DBHandler) - if err != nil { - slog.Default().Error("mqtt create client or handler failed,", slog.Any("err", err)) - cancel() - return - } - eg.Go(func() error { - slog.Default().Info("start mqtt handler") - return mqttHandler.Start(ctx) - }) - r := chi.NewRouter() // r.Use(middleware.Recoverer) r.Use(middleware.Heartbeat("/debug/ping")) r.Mount("/debug", middleware.Profiler()) r.Handle(statev1connect.NewStateManagerServiceHandler(&connectHandler.StateManagerServer{DBHandler: DBHandler, MqttHandler: mqttHandler})) - r.Handle(statev1connect.NewStateManagerServiceHandler(&connectHandler.StateManagerServer{DBHandler: DBHandler, MqttHandler: mqttHandler})) r.Use(httplog.RequestLogger( httplog.NewLogger( "http_server", @@ -171,8 +142,6 @@ func main() { ), ), ) - r.Mount("/debug", middleware.Profiler()) - r.Handle(statev1connect.NewStateManagerServiceHandler(&connectHandler.StateManagerServer{})) srv := &http.Server{ Addr: net.JoinHostPort("0.0.0.0", "8080"), @@ -180,6 +149,7 @@ func main() { ReadHeaderTimeout: 60 * time.Second, BaseContext: func(net.Listener) context.Context { return ctx }, } + eg.Go(func() error { errC := make(chan error) go func() { @@ -197,6 +167,7 @@ func main() { return ctx.Err() } }) + //go operation.Handler() eg.Go(func() error { slog.Default().Info("start mqtt handler") diff --git a/backend/state-manager/pkg/mqtt_handler/mqtt_handler.go b/backend/state-manager/pkg/mqtt_handler/mqtt_handler.go index 84b6918..513dfd6 100644 --- a/backend/state-manager/pkg/mqtt_handler/mqtt_handler.go +++ b/backend/state-manager/pkg/mqtt_handler/mqtt_handler.go @@ -45,7 +45,7 @@ func (h *Handler) Start(ctx context.Context) error { slog.Default().Info("Interrupted at mqtt_handler") h.client.Disconnect(1000) slog.Default().Info("Disconnected from mqtt broker") - return nil + return fmt.Errorf("interrupted at mqtt_handler: %w", ctx.Err()) } } }