Skip to content

Commit

Permalink
KV / GoTMPL / emojis
Browse files Browse the repository at this point in the history
  • Loading branch information
qjoly committed Sep 8, 2024
1 parent a790a80 commit d0c6fee
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 58 deletions.
57 changes: 43 additions & 14 deletions coffee-maker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
consumerName = "coffeeMakers"
subjects = "coffee.orders.*"
streamName = "coffee-orders"
kvBucket = "orders-values"
kvVar = "orders.pending"
)

type CoffeeOrder struct {
Expand All @@ -31,7 +33,7 @@ func main() {

natsUrl := os.Getenv("NATS_URL")
if natsUrl == "" {
fmt.Println("Please, provide the NATS URL in NATS_URL")
log.Println("Please, provide the NATS URL in NATS_URL")
os.Exit(1)
}

Expand All @@ -40,10 +42,10 @@ func main() {
defer nc.Close()

js, err := jetstream.New(nc)

if err != nil {
log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down Expand Up @@ -72,47 +74,74 @@ func main() {
log.Fatal(err)
}

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: kvBucket,
})

if err != nil {
log.Fatal(err)
}

cc, err := cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("New message from %s : %s ", msg.Subject(), string(msg.Data()))

var coffee CoffeeOrder
err := json.Unmarshal(msg.Data(), &coffee)
consInfo, err := cons.Info(context.TODO())
if err != nil {
fmt.Println("Error unmarshalling order: ", err)
log.Println("Error getting consInfo: ", err)
// If the order is invalid, delete it
msg.Term()
return
}

numPendingOrders := consInfo.NumAckPending - 1

_, err = kv.Put(context.TODO(), kvVar, []byte(fmt.Sprint(numPendingOrders)))
if err != nil {
log.Println("can't save orders pending to kv : " + err.Error())
}

log.Printf("☕ New order : %s", string(msg.Data()))

msg.InProgress()

time.Sleep(1 * time.Second)
msg.Ack()

var coffee CoffeeOrder
err = json.Unmarshal(msg.Data(), &coffee)
if err != nil {
log.Println("Error unmarshalling order: ", err)
// If the order is invalid, delete it
msg.Term()
return
}

number := rand.Intn(100)
// 5% of chance to fail the coffee
if number <= 5 {
fmt.Print("--- failed !")
log.Println("💢 I failed the coffee order, skipping...")
msg.Nak()
return
} else {

subjectStock := fmt.Sprintf("coffee.stock.%s.dec.%s", coffee.BeanType, coffee.Size)
msg.Ack()
_, err := nc.Request(subjectStock, []byte(""), 2*time.Second)
log.Println("📦 Updating the stock...")
_, err = nc.Request(subjectStock, []byte(""), 2*time.Second)
if err != nil {
fmt.Println(err.Error())
log.Printf("can't update stock: %s \n", err.Error())
return
}
}

fmt.Printf("\n")
log.Println("✅ Coffee served")

})

if err != nil {
log.Fatal(err)
}
defer cc.Drain()

fmt.Println("wait forever")
for {
}
log.Println("⌛ Waiting for orders...")
select {}

}
16 changes: 7 additions & 9 deletions controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -48,7 +49,7 @@ func main() {

var order CoffeeOrder

fmt.Println("Received a new order: " + string(msg.Data))
log.Println("Received a new order: " + string(msg.Data))

err := json.Unmarshal(msg.Data, &order)
if err != nil {
Expand All @@ -70,19 +71,16 @@ func main() {
return
}

fmt.Println("Received stock quantity: ", stockQuantity)

fmt.Println(order)
fmt.Println("Needed :", coffeeQuantityMap[order.Size])

// If the quantity of coffee is greater than the amount of coffee needed for the order
// then schedule the order
// else return an error response
fmt.Printf("Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity)
log.Printf("🤔 Order ask for %d (size %s), we have currently %d", coffeeQuantityMap[order.Size], order.Size, stockQuantity)
if coffeeQuantityMap[order.Size] <= stockQuantity {
log.Println("✅ Order successfully submitted")
response.Status = "success"
response.Message = "The coffee has been successfully scheduled"
} else {
log.Println("❌ Not enough stock")
response.Status = "error"
response.Message = fmt.Sprintf("Insufficient coffee for type %s", order.BeanType)
}
Expand All @@ -98,13 +96,13 @@ func main() {
}

jsonData, _ := json.Marshal(response)
fmt.Printf("Status: %s, Message: %s\n", response.Status, response.Message)
log.Printf("🗣️ Status: %s, Message: %s\n", response.Status, response.Message)
msg.Respond(jsonData)
})

defer sub.Unsubscribe()

fmt.Println("Waiting for orders")
log.Println("Waiting for orders...")
// wait forever
select {}

Expand Down
1 change: 1 addition & 0 deletions routes/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
2 changes: 2 additions & 0 deletions routes/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
95 changes: 78 additions & 17 deletions routes/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"html/template"
"log"
"net/http"
"os"
"strconv"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

const (
subject = "coffee.web.requests"
subject = "coffee.web.requests"
concurrency = 5
)

type CoffeeOrder struct {
Expand All @@ -29,15 +33,14 @@ type controllerResponse struct {
Message string `json:"message"`
}

var (
nc *nats.Conn
)

func sendOrderToController(order CoffeeOrder) (controllerResponse, error) {
var response controllerResponse
url := os.Getenv("NATS_URL")
if url == "" {
return response, errors.New("Please provide nats url in NATS_URL env")
}

nc, _ := nats.Connect(url)
defer nc.Drain()
log.Printf("☕ New order received for %s", order.Name)

jsonData, err := json.Marshal(order)
if err != nil {
Expand All @@ -47,7 +50,7 @@ func sendOrderToController(order CoffeeOrder) (controllerResponse, error) {

rep, err := nc.Request(subject, jsonData, 2*time.Second)
if err != nil {
fmt.Println(err.Error())
log.Printf("Error sending order: %v", err)
return response, err
}

Expand All @@ -69,7 +72,7 @@ func handleCoffeeOrder(w http.ResponseWriter, r *http.Request) {
var order CoffeeOrder
err := json.NewDecoder(r.Body).Decode(&order)
if err != nil {
fmt.Println(err.Error())
log.Printf("Error unmarshalling order: %v", err)
http.Error(w, "Invalid request payload", http.StatusBadRequest)
return
}
Expand All @@ -86,33 +89,91 @@ func handleCoffeeOrder(w http.ResponseWriter, r *http.Request) {

controllerResponse, err = sendOrderToController(order)
if err != nil {
fmt.Println(err.Error())

log.Println("💢 Coffee cannot be scheduled !", err)
controllerResponse.Status = "error"
responseTitle = "Oh..."
controllerResponse.Message = "Sadly, we were not able to discuss with our backend system..."
}

fmt.Println(controllerResponse)
log.Println("🗣️ Response : ", controllerResponse.Message)

json.NewEncoder(w).Encode(map[string]string{"title": responseTitle, "message": controllerResponse.Message, "status": controllerResponse.Status})

return
}

func handleHome(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "./src/index.html")
func handleHome(w http.ResponseWriter, numberOfPendingOrders int) {

tmpl, err := template.ParseFiles("./src/index.html")
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

var waitingTime string
if numberOfPendingOrders != -1 {
// Each coffee take about 1 minute

waitingTime = strconv.Itoa(numberOfPendingOrders/concurrency) + "min"
} else {
waitingTime = "Unknown"
}

data := struct {
WaitingTime string
}{
WaitingTime: waitingTime,
}

err = tmpl.Execute(w, data)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}

func main() {

url := os.Getenv("NATS_URL")
nc, _ = nats.Connect(url)
defer nc.Drain()

js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}

kv, err := js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
Bucket: "orders-values",
})
if err != nil {
log.Fatal(err)
}

http.HandleFunc("/order-coffee", handleCoffeeOrder)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/index", http.StatusMovedPermanently)
})

http.HandleFunc("/index", handleHome)
http.HandleFunc("/index", func(w http.ResponseWriter, r *http.Request) {
var numberOfPendingOrders int

kvValuePendingOrders, err := kv.Get(context.Background(), "orders.pending")
if err != nil {
numberOfPendingOrders = -1
} else {
numberOfPendingOrders, err = strconv.Atoi(string(kvValuePendingOrders.Value()))
if err != nil {

numberOfPendingOrders = -1
}
}

handleHome(w, numberOfPendingOrders)
})

fmt.Println("Starting server on port 8080...")
log.Println("Starting server on port 8080...")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Could not start server: %s\n", err)
}
Expand Down
6 changes: 6 additions & 0 deletions routes/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ <h1>Order Your Coffee</h1>

<button type="submit">Order Coffee</button>
</form>

<footer style="text-align: center; margin-top: 20px;">
Waiting time : {{.WaitingTime}}
</footer>


</div>

<script>
Expand Down
Loading

0 comments on commit d0c6fee

Please sign in to comment.