Skip to content

go-faster/flightorder

Repository files navigation

flightorder GoDoc

This package allows to do [ordered input] -> [parallel processing] -> [ordered output] in a streaming manner.

The name was inspired by golang.org/x/sync/singleflight package.

Installation

go get github.com/go-faster/flightorder@latest

Example

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/go-faster/flightorder"
)

func main() {
	input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
	processingOrder, output := processInput(context.TODO(), input)
	fmt.Printf("input:     %v\n", input)
	fmt.Printf("processed: %v\n", processingOrder)
	fmt.Printf("output:    %v\n", output)
}

func processInput(ctx context.Context, input []int) (processing, output []int) {
	route := flightorder.NewRoute(flightorder.RouteParams{})

	var (
		mux sync.Mutex
		wg  sync.WaitGroup
	)

	wg.Add(len(input))
	for _, v := range input {
		ticket := route.Ticket()
		go func(t *flightorder.Ticket, v int) {
			defer wg.Done()
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

			mux.Lock()
			processing = append(processing, v)
			mux.Unlock()

			_ = route.CompleteTicket(ctx, t, func(context.Context) error {
				mux.Lock()
				output = append(output, v)
				mux.Unlock()
				return nil
			})
		}(ticket, v)
	}

	wg.Wait()
	return
}

Output:

input:     [1 2 3 4 5 6 7 8 9]
processed: [3 1 9 7 6 2 5 4 8]
output:    [1 2 3 4 5 6 7 8 9]

Motivation

Sending logs from a single file to multiple kafka brokers concurrently while preserving at-least-once delivery guarantees:

  • Logs are sent to multiple kafka brokers in parallel to enhance throughput.
  • File offsets are committed in the exact order they are read to ensure at-least-once delivery guarantees and prevent data loss in case of shipper or broker failures.

License

Source code is available under Apache License 2.0

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages