Skip to content

Commit

Permalink
Merge branch 'main' into add-connect-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
csenet committed Nov 21, 2023
2 parents 18de977 + 7f80f7e commit eb3817c
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 358 deletions.
46 changes: 0 additions & 46 deletions .github/workflows/esp32-cam-udp-build-check.yaml

This file was deleted.

14 changes: 10 additions & 4 deletions backend/onetime/seed-data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package main

import (
"context"
"os"

"github.com/joho/godotenv"
statev1 "github.com/ueckoken/plarail2023/backend/spec/state/v1"
"github.com/ueckoken/plarail2023/backend/state-manager/pkg/db"
dbhandler "github.com/ueckoken/plarail2023/backend/state-manager/pkg/db"
"go.mongodb.org/mongo-driver/mongo/options"
"gopkg.in/yaml.v3"
"os"
)

type Station string
Expand All @@ -26,8 +29,11 @@ func main() {
if err := godotenv.Load(".env"); err != nil {
panic(err)
}
db.Open()
defer db.C()
db, err := dbhandler.Open(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
if err != nil {
return
}
defer db.Close()
data := &Seed{}
b, _ := os.ReadFile("./data/nt-tokyo.yaml")
if err := yaml.Unmarshal(b, data); err != nil {
Expand Down
Binary file removed backend/state-manager/cmd/main
Binary file not shown.
75 changes: 68 additions & 7 deletions backend/state-manager/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package main

import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/httplog/v2"
"github.com/joho/godotenv"
"github.com/ueckoken/plarail2023/backend/spec/state/v1/statev1connect"
connectHandler "github.com/ueckoken/plarail2023/backend/state-manager/pkg/connect"
"github.com/ueckoken/plarail2023/backend/state-manager/pkg/db"
"github.com/ueckoken/plarail2023/backend/state-manager/pkg/mqtt_handler"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
Expand All @@ -29,6 +33,19 @@ const (
Prod
)

func (a AppEnv) String() string {
switch a {
case Dev:
return "dev"
case Test:
return "test"
case Prod:
return "prod"
default:
return "unknown"
}
}

var appEnv AppEnv = Dev
var (
version = "develop"
Expand Down Expand Up @@ -65,23 +82,49 @@ func init() {
func main() {
err := godotenv.Load(".env")
if err != nil {
slog.Default().Error("Error loading .env file")
slog.Default().Error("Error loading .env file", slog.Any("error", err))
os.Exit(1)
}
baseCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, stop := signal.NotifyContext(baseCtx, os.Interrupt)
defer stop()

go func() {
<-ctx.Done()
slog.Default().Info("signal received")
slog.Default().Info("signal received or canceled")
}()

eg, ctx := errgroup.WithContext(ctx)

DBOpts := options.Client().ApplyURI(os.Getenv("MONGODB_URI"))
DBHandler, err := db.Open(ctx, DBOpts)
if err != nil {
slog.Default().Error("database connection failed", slog.Any("err", err))
cancel()
return
}
mqttClientOpts := mqtt.NewClientOptions()
mqttClientOpts.AddBroker(os.Getenv("MQTT_BROKER_ADDR"))
mqttClientOpts.Username = os.Getenv("MQTT_USERNAME")
mqttClientOpts.Password = os.Getenv("MQTT_PASSWORD")
mqttClientOpts.ClientID = os.Getenv("MQTT_CLIENT_ID")

mqttHandler, err := mqtt_handler.NewHandler(mqttClientOpts, DBHandler)
if err != nil {
slog.Default().Error("mqtt create client or handler failed,", slog.Any("err", err))
cancel()
return
}
eg.Go(func() error {
slog.Default().Info("start mqtt handler")
return mqttHandler.Start(ctx)
})

r := chi.NewRouter()
// r.Use(middleware.Recoverer)
r.Use(middleware.Heartbeat("/debug/ping"))
r.Mount("/debug", middleware.Profiler())
r.Handle(statev1connect.NewStateManagerServiceHandler(&connectHandler.StateManagerServer{DBHandler: DBHandler, MqttHandler: mqttHandler}))
r.Use(httplog.RequestLogger(
httplog.NewLogger(
"http_server",
Expand All @@ -108,18 +151,36 @@ func main() {
ReadHeaderTimeout: 60 * time.Second,
BaseContext: func(net.Listener) context.Context { return ctx },
}
eg.Go(srv.ListenAndServe)
eg.Go(func() error {
errC := make(chan error)
go func() {
slog.Default().Info("start http server")
if err := srv.ListenAndServe(); err != nil {
slog.Default().Error("http server error", slog.Any("error", err))
errC <- err
}
}()
select {
case err := <-errC:
return fmt.Errorf("http server error: %w", err)
case <-ctx.Done():
slog.Default().Info("Interrupted at http server")
return ctx.Err()
}
})
//go operation.Handler()
eg.Go(func() error {
slog.Default().Info("start mqtt handler")
return mqtt_handler.StartHandler(ctx)
err := mqttHandler.Start(ctx)
return fmt.Errorf("mqtt handler error: %w", err)
})

// errGroup.Waitはeg.Goが全てerrorを返すまでwaitする
if err := eg.Wait(); err != nil {
slog.Default().Error("error in sub goroutine at main", err)
slog.Default().Error("error in sub goroutine at main", slog.Any("error", err))
}
slog.Default().Info("shutting down server")
newCtx, srvTimeOutCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer srvTimeOutCancel()
srv.Shutdown(newCtx)
<-newCtx.Done()
}
28 changes: 12 additions & 16 deletions backend/state-manager/pkg/connect/connect_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"connectrpc.com/connect"

statev1 "github.com/ueckoken/plarail2023/backend/spec/state/v1"
db "github.com/ueckoken/plarail2023/backend/state-manager/pkg/db"
"github.com/ueckoken/plarail2023/backend/state-manager/pkg/db"
"github.com/ueckoken/plarail2023/backend/state-manager/pkg/mqtt_handler"
)

type StateManagerServer struct{}

type StateManagerServer struct {
DBHandler *db.DBHandler
MqttHandler *mqtt_handler.Handler
}

/*
Block
Expand All @@ -23,9 +27,7 @@ func (s *StateManagerServer) GetBlockStates(
ctx context.Context,
req *connect.Request[statev1.GetBlockStatesRequest],
) (*connect.Response[statev1.GetBlockStatesResponse], error) {
defer db.C()
db.Open()
blockStates, err := db.GetBlocks()
blockStates, err := s.DBHandler.GetBlocks()
if err != nil {
err = connect.NewError(
connect.CodeUnknown,
Expand Down Expand Up @@ -55,9 +57,7 @@ func (s *StateManagerServer) UpdateBlockState(
ctx context.Context,
req *connect.Request[statev1.UpdateBlockStateRequest],
) (*connect.Response[statev1.UpdateBlockStateResponse], error) {
defer db.C()
db.Open()
err := db.UpdateBlock(req.Msg.State)
err := s.DBHandler.UpdateBlock(req.Msg.State)
if err != nil {
err = connect.NewError(
connect.CodeUnknown,
Expand All @@ -77,9 +77,7 @@ func (s *StateManagerServer) UpdatePointState(
ctx context.Context,
req *connect.Request[statev1.UpdatePointStateRequest],
) (*connect.Response[statev1.UpdatePointStateResponse], error) {
defer db.C()
db.Open()
err := db.UpdatePoint(req.Msg.State)
err := s.DBHandler.UpdatePoint(req.Msg.State)
if err != nil {
err = connect.NewError(
connect.CodeUnknown,
Expand All @@ -88,7 +86,7 @@ func (s *StateManagerServer) UpdatePointState(
slog.Default().Error("db error", err)
return nil, err
}
mqtt_handler.NotifyStateUpdate("point", req.Msg.State.Id, req.Msg.State.State.String())
s.MqttHandler.NotifyStateUpdate("point", req.Msg.State.Id, req.Msg.State.State.String())

return connect.NewResponse(&statev1.UpdatePointStateResponse{}), nil
}
Expand All @@ -112,9 +110,7 @@ func (s *StateManagerServer) UpdateStopState(
ctx context.Context,
req *connect.Request[statev1.UpdateStopStateRequest],
) (*connect.Response[statev1.UpdateStopStateResponse], error) {
db.Open()
defer db.C()
err := db.UpdateStop(req.Msg.State)
err := s.DBHandler.UpdateStop(req.Msg.State)
if err != nil {
err = connect.NewError(
connect.CodeUnknown,
Expand All @@ -123,7 +119,7 @@ func (s *StateManagerServer) UpdateStopState(
slog.Default().Error("db connection error", err)
return nil, err
}
mqtt_handler.NotifyStateUpdate("stop", req.Msg.State.Id, req.Msg.State.State.String())
s.MqttHandler.NotifyStateUpdate("stop", req.Msg.State.Id, req.Msg.State.State.String())
return connect.NewResponse(&statev1.UpdateStopStateResponse{}), nil
}

Expand Down
Loading

0 comments on commit eb3817c

Please sign in to comment.