Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

console: Allow for multiple subscribers #194

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ task:
<< : *REGULAR_TASK_TEMPLATE
name: FreeBSD 14
freebsd_instance:
image_family: freebsd-14-0
image_family: freebsd-14-1
40 changes: 25 additions & 15 deletions daemon/albatross_console.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
- Add name (by vmmd) --> creates a new console slurper for name,
and starts a read_console task
- Attach name --> attaches console of name: send existing stuff to client,
and record the requesting socket to receive further messages. A potential
earlier subscriber to the same console is closed. *)
and record the requesting socket to receive further messages. Multiple
clients can be attached simultaneously. *)

open Lwt.Infix

Expand All @@ -25,9 +25,7 @@ let read_console id name ring fd =
Logs.debug (fun m -> m "read %s" line) ;
let t = Ptime_clock.now () in
Vmm_ring.write ring (t, line) ;
(match Vmm_core.String_map.find_opt name !active with
| None -> Lwt.return_unit
| Some (version, utc, fd) ->
let f (version, utc, fd) =
let header = Vmm_commands.header ~version id in
let data =
if utc then
Expand All @@ -38,9 +36,20 @@ let read_console id name ring fd =
Vmm_lwt.write_wire fd (header, data) >>= function
| Error _ ->
Vmm_lwt.safe_close fd >|= fun () ->
active := Vmm_core.String_map.remove name !active
| Ok () -> Lwt.return_unit) >>=
loop
let update s =
let s = Option.value ~default:[] s in
let s' = List.filter (fun (_v, _u, fd') -> fd <> fd') s in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be sure, we can compare file descriptors? I think it is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with Unix.stdin = Unix.stdout and Unix.stdin = Unix.stdin. It worked fine in the toplevel. It would be nice to have somewhere to point to that says "it's safe to compare Unix.file_descras I rememberUnix.file_descr` being a bit special.

We can also have a counter that we use to generate IDs we compare with.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to do. There's been here and there code to take the identifier (integer) of a fd -- but I guess we don't need that nor want to go down that route.

It'd indeed be nice if there was a Unix.compare_fd or Unix.equal_fd function provided by OCaml.

if s' = [] then
None
else
Some s'
in
active := Vmm_core.String_map.update name update !active
| Ok () -> Lwt.return_unit
in
Lwt_list.iter_p f
(Vmm_core.String_map.find_opt name !active |> Option.value ~default:[])
>>= loop
in
loop ())
(fun e ->
Expand Down Expand Up @@ -92,16 +101,17 @@ let add_fifo id =
let subscribe s version ~utc id =
let name = Vmm_core.Name.to_string id in
Logs.debug (fun m -> m "attempting to subscribe %a" Vmm_core.Name.pp id) ;
let update e es =
let es = Option.value ~default:[] es in
Some (e :: es)
in
match Vmm_core.String_map.find_opt name !t with
| None ->
active := Vmm_core.String_map.add name (version, utc, s) !active ;
active := Vmm_core.String_map.update name (update (version, utc, s)) !active ;
Lwt.return (None, "waiting for VM")
| Some r ->
(match Vmm_core.String_map.find_opt name !active with
| None -> Lwt.return_unit
| Some (_, _, s) -> Vmm_lwt.safe_close s) >|= fun () ->
active := Vmm_core.String_map.add name (version, utc, s) !active ;
(Some r, "subscribed")
active := Vmm_core.String_map.update name (update (version, utc, s)) !active ;
Lwt.return (Some r, "subscribed")

let send_history s version ~utc r id since =
let entries =
Expand Down Expand Up @@ -209,7 +219,7 @@ let cmd =
`S "DESCRIPTION";
`P "$(tname) reads the console output of a unikernel and preserves the
latest 1000 lines in a ring buffer in memory for clients. Each unikernel
may only have a single subscribed client, which is sent to until the
may have multiple subscribed clients, which is sent to until the
client closes the connection (each new line is sent as new message on
the stream). The albatross-daemon informs albatross-console when a new
unikernel is created, and albatross-console starts reading from the
Expand Down
Loading