diff --git a/.cirrus.yml b/.cirrus.yml index b1490b8..6e4a81b 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -25,4 +25,4 @@ task: << : *REGULAR_TASK_TEMPLATE name: FreeBSD 14 freebsd_instance: - image_family: freebsd-14-0 + image_family: freebsd-14-1 diff --git a/daemon/albatross_console.ml b/daemon/albatross_console.ml index ccd625d..58eeb58 100644 --- a/daemon/albatross_console.ml +++ b/daemon/albatross_console.ml @@ -7,8 +7,8 @@ - Add name (by vmmd) --> creates a new console slurper for name, and starts a read_console task - Attach name --> attaches console of name: send existing stuff to client, - and record the requesting socket to receive further messages. A potential - earlier subscriber to the same console is closed. *) + and record the requesting socket to receive further messages. Multiple + clients can be attached simultaneously. *) open Lwt.Infix @@ -25,9 +25,7 @@ let read_console id name ring fd = Logs.debug (fun m -> m "read %s" line) ; let t = Ptime_clock.now () in Vmm_ring.write ring (t, line) ; - (match Vmm_core.String_map.find_opt name !active with - | None -> Lwt.return_unit - | Some (version, utc, fd) -> + let f (version, utc, fd) = let header = Vmm_commands.header ~version id in let data = if utc then @@ -38,9 +36,20 @@ let read_console id name ring fd = Vmm_lwt.write_wire fd (header, data) >>= function | Error _ -> Vmm_lwt.safe_close fd >|= fun () -> - active := Vmm_core.String_map.remove name !active - | Ok () -> Lwt.return_unit) >>= - loop + let update s = + let s = Option.value ~default:[] s in + let s' = List.filter (fun (_v, _u, fd') -> fd <> fd') s in + if s' = [] then + None + else + Some s' + in + active := Vmm_core.String_map.update name update !active + | Ok () -> Lwt.return_unit + in + Lwt_list.iter_p f + (Vmm_core.String_map.find_opt name !active |> Option.value ~default:[]) + >>= loop in loop ()) (fun e -> @@ -92,16 +101,17 @@ let add_fifo id = let subscribe s version ~utc id = let name = Vmm_core.Name.to_string id in Logs.debug (fun m -> m "attempting to subscribe %a" Vmm_core.Name.pp id) ; + let update e es = + let es = Option.value ~default:[] es in + Some (e :: es) + in match Vmm_core.String_map.find_opt name !t with | None -> - active := Vmm_core.String_map.add name (version, utc, s) !active ; + active := Vmm_core.String_map.update name (update (version, utc, s)) !active ; Lwt.return (None, "waiting for VM") | Some r -> - (match Vmm_core.String_map.find_opt name !active with - | None -> Lwt.return_unit - | Some (_, _, s) -> Vmm_lwt.safe_close s) >|= fun () -> - active := Vmm_core.String_map.add name (version, utc, s) !active ; - (Some r, "subscribed") + active := Vmm_core.String_map.update name (update (version, utc, s)) !active ; + Lwt.return (Some r, "subscribed") let send_history s version ~utc r id since = let entries = @@ -209,7 +219,7 @@ let cmd = `S "DESCRIPTION"; `P "$(tname) reads the console output of a unikernel and preserves the latest 1000 lines in a ring buffer in memory for clients. Each unikernel - may only have a single subscribed client, which is sent to until the + may have multiple subscribed clients, which is sent to until the client closes the connection (each new line is sent as new message on the stream). The albatross-daemon informs albatross-console when a new unikernel is created, and albatross-console starts reading from the