Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
qjoly committed Aug 30, 2024
1 parent 08f7216 commit a790a80
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 13 deletions.
34 changes: 30 additions & 4 deletions coffee-maker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
Expand All @@ -18,6 +19,14 @@ const (
streamName = "coffee-orders"
)

type CoffeeOrder struct {
Size string `json:"size"`
BeanType string `json:"bean_type"`
Milk string `json:"milk"`
Name string `json:"name"`
SugarCount string `json:"sugar_count"`
}

func main() {

natsUrl := os.Getenv("NATS_URL")
Expand Down Expand Up @@ -65,16 +74,33 @@ func main() {

cc, err := cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("New message from %s : %s ", msg.Subject(), string(msg.Data()))

var coffee CoffeeOrder
err := json.Unmarshal(msg.Data(), &coffee)
if err != nil {
fmt.Println("Error unmarshalling order: ", err)
// If the order is invalid, delete it
msg.Term()
return
}

msg.InProgress()

number := rand.Intn(10)
if number == 0 {
number := rand.Intn(100)
// 5% of chance to fail the coffee
if number <= 5 {
fmt.Print("--- failed !")
msg.Nak()
return
} else {
// fmt.Print("- succeed")
time.Sleep(200 * time.Millisecond)

subjectStock := fmt.Sprintf("coffee.stock.%s.dec.%s", coffee.BeanType, coffee.Size)
msg.Ack()
_, err := nc.Request(subjectStock, []byte(""), 2*time.Second)
if err != nil {
fmt.Println(err.Error())
return
}
}

fmt.Printf("\n")
Expand Down
76 changes: 68 additions & 8 deletions controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,108 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"time"

"github.com/nats-io/nats.go"
)

const (
subject = "coffee.web.requests"
subjectSub = "coffee.web.requests"
subjectStockManagerPrefix = "coffee.stock"
coffeeMakersSubjectPrefix = "coffee.orders"
)

type controllerResponse struct {
Status string `json:"status"` // success or error
Message string `json:"message"`
}

type CoffeeOrder struct {
Size string `json:"size"`
BeanType string `json:"bean_type"`
Milk string `json:"milk"`
Name string `json:"name"`
SugarCount string `json:"sugar_count"`
}

func main() {

url := os.Getenv("NATS_URL")
if url == "" {
url = "192.168.128.51:4222"
}

coffeeQuantityMap := make(map[string]int)
coffeeQuantityMap["small"] = 9
coffeeQuantityMap["medium"] = 17
coffeeQuantityMap["large"] = 30

nc, _ := nats.Connect(url)
defer nc.Drain()

sub, _ := nc.Subscribe(subject, func(msg *nats.Msg) {
sub, _ := nc.Subscribe(subjectSub, func(msg *nats.Msg) {

var order CoffeeOrder

fmt.Println("Received a new order: " + string(msg.Data))

err := json.Unmarshal(msg.Data, &order)
if err != nil {
fmt.Println("Error unmarshalling order: ", err)
return
}

var response controllerResponse
response.Status = "success"
response.Message = "The coffee has been successfully scheduled"

// Handle message
resp, err := nc.Request(fmt.Sprintf("%s.%s.%s", subjectStockManagerPrefix, order.BeanType, "get"), []byte(""), 2*time.Second)
if err != nil {
fmt.Println(err.Error())
return
}

stockQuantity, err := strconv.Atoi(string(resp.Data))
if err != nil {
fmt.Println(err.Error())
return
}

fmt.Println("Received stock quantity: ", stockQuantity)

fmt.Println(order)
fmt.Println("Needed :", coffeeQuantityMap[order.Size])

// If the quantity of coffee is greater than the amount of coffee needed for the order
// then schedule the order
// else return an error response
fmt.Printf("Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity)
if coffeeQuantityMap[order.Size] <= stockQuantity {
response.Status = "success"
response.Message = "The coffee has been successfully scheduled"
} else {
response.Status = "error"
response.Message = fmt.Sprintf("Insufficient coffee for type %s", order.BeanType)
}

// Notify coffee-makers that the order is scheduled

if response.Status == "success" {
err = nc.Publish(fmt.Sprintf("%s.%s", coffeeMakersSubjectPrefix, order.BeanType), msg.Data)
if err != nil {
fmt.Println(err.Error())
return
}
}

jsonData, _ := json.Marshal(response)
fmt.Printf("Status: %s, Message: %s\n", response.Status, response.Message)
msg.Respond(jsonData)
})

defer sub.Unsubscribe()

fmt.Println("Waiting for orders")
// wait forever
for {
}
defer sub.Unsubscribe()
select {}

}
2 changes: 1 addition & 1 deletion routes/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ <h1>Order Your Coffee</h1>
<select id="size" name="size" required>
<option value="small">Small</option>
<option value="medium">Medium</option>
<option value="big">Big</option>
<option value="large">Large</option>
</select>

<label for="bean_type">Type of Beans:</label>
Expand Down
1 change: 1 addition & 0 deletions stock-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func main() {
return
}
log.Printf("Decremented %s coffee by %s", typeBean, quantity)
m.Respond([]byte("OK"))

default:
fmt.Println("This action is not supported.")
Expand Down

0 comments on commit a790a80

Please sign in to comment.