diff --git a/coffee-maker/main.go b/coffee-maker/main.go index 7f71559..84810a4 100644 --- a/coffee-maker/main.go +++ b/coffee-maker/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "log" "os" @@ -18,6 +19,14 @@ const ( streamName = "coffee-orders" ) +type CoffeeOrder struct { + Size string `json:"size"` + BeanType string `json:"bean_type"` + Milk string `json:"milk"` + Name string `json:"name"` + SugarCount string `json:"sugar_count"` +} + func main() { natsUrl := os.Getenv("NATS_URL") @@ -65,16 +74,33 @@ func main() { cc, err := cons.Consume(func(msg jetstream.Msg) { fmt.Printf("New message from %s : %s ", msg.Subject(), string(msg.Data())) + + var coffee CoffeeOrder + err := json.Unmarshal(msg.Data(), &coffee) + if err != nil { + fmt.Println("Error unmarshalling order: ", err) + // If the order is invalid, delete it + msg.Term() + return + } + msg.InProgress() - number := rand.Intn(10) - if number == 0 { + number := rand.Intn(100) + // 5% of chance to fail the coffee + if number <= 5 { fmt.Print("--- failed !") msg.Nak() + return } else { - // fmt.Print("- succeed") - time.Sleep(200 * time.Millisecond) + + subjectStock := fmt.Sprintf("coffee.stock.%s.dec.%s", coffee.BeanType, coffee.Size) msg.Ack() + _, err := nc.Request(subjectStock, []byte(""), 2*time.Second) + if err != nil { + fmt.Println(err.Error()) + return + } } fmt.Printf("\n") diff --git a/controller/main.go b/controller/main.go index 3b8ef8b..1660849 100644 --- a/controller/main.go +++ b/controller/main.go @@ -4,12 +4,16 @@ import ( "encoding/json" "fmt" "os" + "strconv" + "time" "github.com/nats-io/nats.go" ) const ( - subject = "coffee.web.requests" + subjectSub = "coffee.web.requests" + subjectStockManagerPrefix = "coffee.stock" + coffeeMakersSubjectPrefix = "coffee.orders" ) type controllerResponse struct { @@ -17,6 +21,14 @@ type controllerResponse struct { Message string `json:"message"` } +type CoffeeOrder struct { + Size string `json:"size"` + BeanType string `json:"bean_type"` + Milk string `json:"milk"` + Name string `json:"name"` + SugarCount string `json:"sugar_count"` +} + func main() { url := os.Getenv("NATS_URL") @@ -24,28 +36,76 @@ func main() { url = "192.168.128.51:4222" } + coffeeQuantityMap := make(map[string]int) + coffeeQuantityMap["small"] = 9 + coffeeQuantityMap["medium"] = 17 + coffeeQuantityMap["large"] = 30 + nc, _ := nats.Connect(url) defer nc.Drain() - sub, _ := nc.Subscribe(subject, func(msg *nats.Msg) { + sub, _ := nc.Subscribe(subjectSub, func(msg *nats.Msg) { + + var order CoffeeOrder fmt.Println("Received a new order: " + string(msg.Data)) + err := json.Unmarshal(msg.Data, &order) + if err != nil { + fmt.Println("Error unmarshalling order: ", err) + return + } + var response controllerResponse - response.Status = "success" - response.Message = "The coffee has been successfully scheduled" - // Handle message + resp, err := nc.Request(fmt.Sprintf("%s.%s.%s", subjectStockManagerPrefix, order.BeanType, "get"), []byte(""), 2*time.Second) + if err != nil { + fmt.Println(err.Error()) + return + } + + stockQuantity, err := strconv.Atoi(string(resp.Data)) + if err != nil { + fmt.Println(err.Error()) + return + } + + fmt.Println("Received stock quantity: ", stockQuantity) + + fmt.Println(order) + fmt.Println("Needed :", coffeeQuantityMap[order.Size]) + + // If the quantity of coffee is greater than the amount of coffee needed for the order + // then schedule the order + // else return an error response + fmt.Printf("Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity) + if coffeeQuantityMap[order.Size] <= stockQuantity { + response.Status = "success" + response.Message = "The coffee has been successfully scheduled" + } else { + response.Status = "error" + response.Message = fmt.Sprintf("Insufficient coffee for type %s", order.BeanType) + } + + // Notify coffee-makers that the order is scheduled + + if response.Status == "success" { + err = nc.Publish(fmt.Sprintf("%s.%s", coffeeMakersSubjectPrefix, order.BeanType), msg.Data) + if err != nil { + fmt.Println(err.Error()) + return + } + } jsonData, _ := json.Marshal(response) fmt.Printf("Status: %s, Message: %s\n", response.Status, response.Message) msg.Respond(jsonData) }) + defer sub.Unsubscribe() + fmt.Println("Waiting for orders") // wait forever - for { - } - defer sub.Unsubscribe() + select {} } diff --git a/routes/src/index.html b/routes/src/index.html index db91589..e8d9042 100644 --- a/routes/src/index.html +++ b/routes/src/index.html @@ -93,7 +93,7 @@

Order Your Coffee

diff --git a/stock-manager/main.go b/stock-manager/main.go index 374dc91..cb92b2f 100644 --- a/stock-manager/main.go +++ b/stock-manager/main.go @@ -145,6 +145,7 @@ func main() { return } log.Printf("Decremented %s coffee by %s", typeBean, quantity) + m.Respond([]byte("OK")) default: fmt.Println("This action is not supported.")