-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathIngester.fs
66 lines (56 loc) · 2.87 KB
/
Ingester.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
module FeedConsumerTemplate.Ingester
open Propulsion.Internal
open System
open FeedConsumerTemplate.Domain
type Outcome = { added: int; notReady: int; dups: int }
/// Gathers stats based on the outcome of each Span processed for periodic emission
type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Outcome>(log, statsInterval, stateInterval)
let mutable added, notReady, dups = 0, 0, 0
override _.HandleOk outcome =
added <- added + outcome.added
notReady <- notReady + outcome.notReady
dups <- dups + outcome.dups
override _.DumpStats() =
base.DumpStats()
if added <> 0 || notReady <> 0 || dups <> 0 then
log.Information(" Added {added} Not Yet Shipped {notReady} Duplicates {dups}", added, notReady, dups)
added <- 0; notReady <- 0; dups <- 0
override _.HandleExn(log, exn) =
log.Information(exn, "Unhandled")
module PipelineEvent =
type Item = { id: TicketId; payload: string }
let ofIndexAndItem index (item: Item) =
FsCodec.Core.TimelineEvent.Create(
index,
"eventType",
Unchecked.defaultof<_>,
context = item)
let [<return: Struct>] (|ItemsForFc|_|) = function
| FsCodec.StreamName.Split (_, FsCodec.StreamId.Parse 2 [| _; FcId.Parse fc |]), (s: Propulsion.Sinks.Event[]) ->
ValueSome (fc, s |> Seq.map (fun e -> Unchecked.unbox<Item> e.Context))
| _ -> ValueNone
let handle maxDop stream events = async {
match stream, events with
| PipelineEvent.ItemsForFc (_fc, items) ->
// Take chunks of max 1000 in order to make handler latency be less 'lumpy'
// What makes sense in terms of a good chunking size will vary depending on the workload in question
let ticketIds = seq { for x in items -> x.id } |> Seq.truncate 1000 |> Seq.toArray
let maybeAccept = Seq.distinct ticketIds |> Seq.mapi (fun i _x -> async {
do! Async.Sleep(TimeSpan.FromSeconds 1.)
return if i % 3 = 1 then Some 42 else None
})
let! results = Async.Parallel(maybeAccept, maxDegreeOfParallelism = maxDop)
let ready = results |> Array.choose id
let maybeAdd = ready |> Seq.mapi (fun i _x -> async {
do! Async.Sleep(TimeSpan.FromSeconds 1.)
return if i % 2 = 1 then Some 42 else None
})
let! added = Async.Parallel(maybeAdd, maxDegreeOfParallelism = maxDop)
let outcome = { added = Seq.length added; notReady = results.Length - ready.Length; dups = results.Length - ticketIds.Length }
return Propulsion.Sinks.PartiallyProcessed ticketIds.Length, outcome
| x -> return failwithf "Unexpected stream %O" x
}
type Factory private () =
static member StartSink(log, stats, dop, handle, maxReadAhead) =
Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, dop, handle, stats)