Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reply handling and add a timeout in the http client #9

Merged
merged 6 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
name: Test Coverage
name: "Code Coverage"

on:
push:
branches: [ master ]
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
branches: [ main ]

jobs:
build:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # pin@v3
with:
fetch-depth: 2
- name: Set up Go
uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # pin@v4
with:
- uses: actions/checkout@master
- uses: actions/setup-go@v4
with:
go-version: '1.20'
check-latest: true
- name: Run coverage
run: make coverage
- name: Upload coverage to Codecov
uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # pin@v3
- name: Run test coverage
run: go test -race -coverprofile=coverage.out -covermode=atomic ./...
- uses: codecov/codecov-action@v3
with:
files: coverage.out
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"net/http"
"sync"
"time"

"fortio.org/log"
"fortio.org/scli"
Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
metrics := NewMetrics(r)

// Initialize the app, metrics are passed along so they are accessible
app := NewApp(maxQueueSize, &http.Client{}, metrics)
app := NewApp(maxQueueSize, &http.Client{
Timeout: 10 * time.Second,
}, metrics)
// The only required flag is the token at the moment.
if tokenFlag == "" {
log.Fatalf("Missing token flag")
Expand Down
75 changes: 27 additions & 48 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"fortio.org/fortio/fhttp"
"fortio.org/fortio/jrpc"
"fortio.org/log"
)

Expand Down Expand Up @@ -45,14 +46,9 @@ func (app *App) StartServer(ctx context.Context, applicationPort string) error {
}

func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// Regardless of the outcome, we always respond as json
w.Header().Set("Content-Type", "application/json")

// "Mock" the response from Slack.
// OK is true by default, so we only need to set it to false if we want to trow an error which then could use a custom error message.
// From testing, any application only checks if OK is true. So we can ignore all other fields
fakeSlackResponse := SlackResponse{
Ok: true,
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

maxQueueSize := int(float64(cap(app.slackQueue)) * 0.9)
Expand All @@ -61,60 +57,42 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// Ideally we don't reject at 90%, but initially after some tests I got blocked. So I decided to be a bit more conservative.
// ToDo: Fix this behavior so we can reach 100% channel size without problems.
if len(app.slackQueue) >= maxQueueSize {
w.WriteHeader(http.StatusServiceUnavailable)
log.S(log.Info, "Queue is almost full, returning StatusServiceUnavailable", log.Int("queueSize", len(app.slackQueue)))

fakeSlackResponse.Ok = false
fakeSlackResponse.Error = "Queue is almost full"
responseData, err := json.Marshal(fakeSlackResponse)
if err != nil {
http.Error(w, "Failed to serialize Slack response", http.StatusInternalServerError)
return
}

_, err = w.Write(responseData)
err := jrpc.Reply[SlackResponse](w, http.StatusServiceUnavailable, &SlackResponse{
Ok: false,
Error: "Queue is almost full",
})
if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}

return
}

if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var request SlackPostMessageRequest
err := json.NewDecoder(r.Body).Decode(&request)
if err != nil {
http.Error(w, "Failed to read the request body", http.StatusInternalServerError)
return
}
requestErr := json.NewDecoder(r.Body).Decode(&request)

// Validate the request
err = validate(request)
if err != nil {
log.S(log.Error, "Invalid request", log.Any("err", err))
// TODO: jrpc.(Client)ErrorReply ?
w.WriteHeader(http.StatusBadRequest)
fakeSlackResponse.Ok = false
fakeSlackResponse.Error = err.Error()
responseData, err2 := json.Marshal(fakeSlackResponse)
_, err3 := w.Write(responseData)
if err2 != nil || err3 != nil {
log.S(log.Error, "Failed to write response", log.Any("err2", err2), log.Any("err3", err3))
}
return
// If we can't decode, we don't bother validating. In the end it's the same outcome if either one is invalid.
if requestErr == nil {
requestErr = validate(request)
}

app.metrics.RequestsReceivedTotal.WithLabelValues(request.Channel).Inc()
if requestErr != nil {
log.S(log.Error, "Invalid request", log.Any("err", requestErr))

responseData, err := json.Marshal(fakeSlackResponse)
if err != nil {
http.Error(w, "Failed to serialize Slack response", http.StatusInternalServerError)
err := jrpc.Reply[SlackResponse](w, http.StatusBadRequest, &SlackResponse{
Ok: false,
Error: requestErr.Error(),
})
if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}
return
}

// Start the logic (as we passed all our checks) to process the request.
app.metrics.RequestsReceivedTotal.WithLabelValues(request.Channel).Inc()
// Add a counter to the wait group, this is important to wait for all the messages to be processed before shutting down the server.
app.wg.Add(1)
// Send the message to the slackQueue to be processed
Expand All @@ -126,8 +104,9 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
// This is the downside of having a queue which could potentially delay responses by a lot.
// We do our due diligences on the received message and can make a fair assumption we will be able to process it.
// Application should utlise this applications metrics and logs to find out if there are any issues.
w.WriteHeader(http.StatusOK)
_, err = w.Write(responseData)
err := jrpc.Reply[SlackResponse](w, http.StatusOK, &SlackResponse{
Ok: true,
})
if err != nil {
log.S(log.Error, "Failed to write response", log.Any("err", err))
}
Expand Down
Loading