From d0c6feebea4ea16b9eb6e3d61f67dc561756a19c Mon Sep 17 00:00:00 2001 From: QJoly Date: Sun, 8 Sep 2024 23:11:20 +0200 Subject: [PATCH] KV / GoTMPL / emojis --- coffee-maker/main.go | 57 +++++++++++++++++++------- controller/main.go | 16 ++++---- routes/go.mod | 1 + routes/go.sum | 2 + routes/main.go | 95 +++++++++++++++++++++++++++++++++++-------- routes/src/index.html | 6 +++ stock-manager/main.go | 34 ++++++++-------- 7 files changed, 153 insertions(+), 58 deletions(-) diff --git a/coffee-maker/main.go b/coffee-maker/main.go index 84810a4..dd06e59 100644 --- a/coffee-maker/main.go +++ b/coffee-maker/main.go @@ -17,6 +17,8 @@ const ( consumerName = "coffeeMakers" subjects = "coffee.orders.*" streamName = "coffee-orders" + kvBucket = "orders-values" + kvVar = "orders.pending" ) type CoffeeOrder struct { @@ -31,7 +33,7 @@ func main() { natsUrl := os.Getenv("NATS_URL") if natsUrl == "" { - fmt.Println("Please, provide the NATS URL in NATS_URL") + log.Println("Please, provide the NATS URL in NATS_URL") os.Exit(1) } @@ -40,10 +42,10 @@ func main() { defer nc.Close() js, err := jetstream.New(nc) - if err != nil { log.Fatal(err) } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -72,38 +74,66 @@ func main() { log.Fatal(err) } + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: kvBucket, + }) + + if err != nil { + log.Fatal(err) + } + cc, err := cons.Consume(func(msg jetstream.Msg) { - fmt.Printf("New message from %s : %s ", msg.Subject(), string(msg.Data())) - var coffee CoffeeOrder - err := json.Unmarshal(msg.Data(), &coffee) + consInfo, err := cons.Info(context.TODO()) if err != nil { - fmt.Println("Error unmarshalling order: ", err) + log.Println("Error getting consInfo: ", err) // If the order is invalid, delete it - msg.Term() return } + numPendingOrders := consInfo.NumAckPending - 1 + + _, err = kv.Put(context.TODO(), kvVar, []byte(fmt.Sprint(numPendingOrders))) + if err != nil { + log.Println("can't save orders pending to kv : " + err.Error()) + } + + log.Printf("☕ New order : %s", string(msg.Data())) + msg.InProgress() + time.Sleep(1 * time.Second) + msg.Ack() + + var coffee CoffeeOrder + err = json.Unmarshal(msg.Data(), &coffee) + if err != nil { + log.Println("Error unmarshalling order: ", err) + // If the order is invalid, delete it + msg.Term() + return + } + number := rand.Intn(100) // 5% of chance to fail the coffee if number <= 5 { - fmt.Print("--- failed !") + log.Println("💢 I failed the coffee order, skipping...") msg.Nak() return } else { subjectStock := fmt.Sprintf("coffee.stock.%s.dec.%s", coffee.BeanType, coffee.Size) msg.Ack() - _, err := nc.Request(subjectStock, []byte(""), 2*time.Second) + log.Println("📦 Updating the stock...") + _, err = nc.Request(subjectStock, []byte(""), 2*time.Second) if err != nil { - fmt.Println(err.Error()) + log.Printf("can't update stock: %s \n", err.Error()) return } } - fmt.Printf("\n") + log.Println("✅ Coffee served") + }) if err != nil { @@ -111,8 +141,7 @@ func main() { } defer cc.Drain() - fmt.Println("wait forever") - for { - } + log.Println("⌛ Waiting for orders...") + select {} } diff --git a/controller/main.go b/controller/main.go index 1660849..a7c3220 100644 --- a/controller/main.go +++ b/controller/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "fmt" + "log" "os" "strconv" "time" @@ -48,7 +49,7 @@ func main() { var order CoffeeOrder - fmt.Println("Received a new order: " + string(msg.Data)) + log.Println("☕ Received a new order: " + string(msg.Data)) err := json.Unmarshal(msg.Data, &order) if err != nil { @@ -70,19 +71,16 @@ func main() { return } - fmt.Println("Received stock quantity: ", stockQuantity) - - fmt.Println(order) - fmt.Println("Needed :", coffeeQuantityMap[order.Size]) - // If the quantity of coffee is greater than the amount of coffee needed for the order // then schedule the order // else return an error response - fmt.Printf("Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity) + log.Printf("🤔 Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity) if coffeeQuantityMap[order.Size] <= stockQuantity { + log.Println("✅ Order successfully submitted") response.Status = "success" response.Message = "The coffee has been successfully scheduled" } else { + log.Println("❌ Not enough stock") response.Status = "error" response.Message = fmt.Sprintf("Insufficient coffee for type %s", order.BeanType) } @@ -98,13 +96,13 @@ func main() { } jsonData, _ := json.Marshal(response) - fmt.Printf("Status: %s, Message: %s\n", response.Status, response.Message) + log.Printf("🗣️ Status: %s, Message: %s\n", response.Status, response.Message) msg.Respond(jsonData) }) defer sub.Unsubscribe() - fmt.Println("Waiting for orders") + log.Println("⌛ Waiting for orders...") // wait forever select {} diff --git a/routes/go.mod b/routes/go.mod index 73f37bb..e93ff0d 100644 --- a/routes/go.mod +++ b/routes/go.mod @@ -10,4 +10,5 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect ) diff --git a/routes/go.sum b/routes/go.sum index ad86381..f28013d 100644 --- a/routes/go.sum +++ b/routes/go.sum @@ -10,3 +10,5 @@ golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= diff --git a/routes/main.go b/routes/main.go index 458524f..9a30dd6 100644 --- a/routes/main.go +++ b/routes/main.go @@ -1,19 +1,23 @@ package main import ( + "context" "encoding/json" "errors" - "fmt" + "html/template" "log" "net/http" "os" + "strconv" "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) const ( - subject = "coffee.web.requests" + subject = "coffee.web.requests" + concurrency = 5 ) type CoffeeOrder struct { @@ -29,15 +33,14 @@ type controllerResponse struct { Message string `json:"message"` } +var ( + nc *nats.Conn +) + func sendOrderToController(order CoffeeOrder) (controllerResponse, error) { var response controllerResponse - url := os.Getenv("NATS_URL") - if url == "" { - return response, errors.New("Please provide nats url in NATS_URL env") - } - nc, _ := nats.Connect(url) - defer nc.Drain() + log.Printf("☕ New order received for %s", order.Name) jsonData, err := json.Marshal(order) if err != nil { @@ -47,7 +50,7 @@ func sendOrderToController(order CoffeeOrder) (controllerResponse, error) { rep, err := nc.Request(subject, jsonData, 2*time.Second) if err != nil { - fmt.Println(err.Error()) + log.Printf("Error sending order: %v", err) return response, err } @@ -69,7 +72,7 @@ func handleCoffeeOrder(w http.ResponseWriter, r *http.Request) { var order CoffeeOrder err := json.NewDecoder(r.Body).Decode(&order) if err != nil { - fmt.Println(err.Error()) + log.Printf("Error unmarshalling order: %v", err) http.Error(w, "Invalid request payload", http.StatusBadRequest) return } @@ -86,33 +89,91 @@ func handleCoffeeOrder(w http.ResponseWriter, r *http.Request) { controllerResponse, err = sendOrderToController(order) if err != nil { - fmt.Println(err.Error()) + + log.Println("💢 Coffee cannot be scheduled !", err) controllerResponse.Status = "error" responseTitle = "Oh..." controllerResponse.Message = "Sadly, we were not able to discuss with our backend system..." } - fmt.Println(controllerResponse) + log.Println("🗣️ Response : ", controllerResponse.Message) json.NewEncoder(w).Encode(map[string]string{"title": responseTitle, "message": controllerResponse.Message, "status": controllerResponse.Status}) - return } -func handleHome(w http.ResponseWriter, r *http.Request) { - http.ServeFile(w, r, "./src/index.html") +func handleHome(w http.ResponseWriter, numberOfPendingOrders int) { + + tmpl, err := template.ParseFiles("./src/index.html") + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + var waitingTime string + if numberOfPendingOrders != -1 { + // Each coffee take about 1 minute + + waitingTime = strconv.Itoa(numberOfPendingOrders/concurrency) + "min" + } else { + waitingTime = "Unknown" + } + + data := struct { + WaitingTime string + }{ + WaitingTime: waitingTime, + } + + err = tmpl.Execute(w, data) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } } func main() { + + url := os.Getenv("NATS_URL") + nc, _ = nats.Connect(url) + defer nc.Drain() + + js, err := jetstream.New(nc) + if err != nil { + log.Fatal(err) + } + + kv, err := js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ + Bucket: "orders-values", + }) + if err != nil { + log.Fatal(err) + } + http.HandleFunc("/order-coffee", handleCoffeeOrder) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/index", http.StatusMovedPermanently) }) - http.HandleFunc("/index", handleHome) + http.HandleFunc("/index", func(w http.ResponseWriter, r *http.Request) { + var numberOfPendingOrders int + + kvValuePendingOrders, err := kv.Get(context.Background(), "orders.pending") + if err != nil { + numberOfPendingOrders = -1 + } else { + numberOfPendingOrders, err = strconv.Atoi(string(kvValuePendingOrders.Value())) + if err != nil { + + numberOfPendingOrders = -1 + } + } + + handleHome(w, numberOfPendingOrders) + }) - fmt.Println("Starting server on port 8080...") + log.Println("Starting server on port 8080...") if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatalf("Could not start server: %s\n", err) } diff --git a/routes/src/index.html b/routes/src/index.html index e8d9042..249c74b 100644 --- a/routes/src/index.html +++ b/routes/src/index.html @@ -117,6 +117,12 @@

Order Your Coffee

+ + + +