Skip to content

Commit

Permalink
Close #619: Use a thread to prefetch folders
Browse files Browse the repository at this point in the history
Prefetch folder data with a background thread
  • Loading branch information
astrada committed Apr 18, 2020
1 parent dfed8cf commit 0543cb6
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 7 deletions.
28 changes: 21 additions & 7 deletions bin/gdfuse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ let setup_application params =
root_folder_id = None;
flush_db_thread = None;
async_upload_thread = None;
folder_fetching_thread = None;
} in
Context.set_ctx context;
if not (DbCache.check_clean_shutdown cache) then begin
Expand Down Expand Up @@ -836,7 +837,7 @@ let () =
begin match context.Context.buffer_eviction_thread with
| None -> ()
| Some buffer_eviction_thread -> begin
Utils.log_message
Utils.log_with_header
"Stopping buffer eviction thread (TID=%d)...%!"
(Thread.id buffer_eviction_thread);
Buffering.MemoryBuffers.stop_eviction_thread
Expand All @@ -848,7 +849,7 @@ let () =
begin match context.Context.flush_db_thread with
| None -> ()
| Some flush_db_thread -> begin
Utils.log_message
Utils.log_with_header
"Stopping flush DB thread (TID=%d)...%!"
(Thread.id flush_db_thread);
MemoryCache.stop_flush_db_thread ();
Expand All @@ -859,20 +860,33 @@ let () =
begin match context.Context.async_upload_thread with
| None -> ()
| Some async_upload_thread -> begin
Utils.log_message
Utils.log_with_header
"Stopping async upload thread (TID=%d)\n%!"
(Thread.id async_upload_thread);
UploadQueue.stop_async_upload_thread ();
Thread.join async_upload_thread;
end
end;
Utils.log_message "Flushing cache...\n%!";
begin match context.Context.folder_fetching_thread with
| None -> ()
| Some folder_fetching_thread -> begin
Utils.log_with_header
"Stopping background folder fetching thread (TID=%d)...%!"
(Thread.id folder_fetching_thread);
BackgroundFolderFetching.stop_folder_fetching_thread ();
Thread.join folder_fetching_thread;
Utils.log_message "done\n%!";
end
end;
Utils.log_with_header "Flushing cache...\n%!";
Cache.flush context.Context.cache;
Utils.log_message "Storing clean shutdown flag...%!";
Utils.log_with_header "Storing clean shutdown flag...%!";
DbCache.set_clean_shutdown context.Context.cache;
Utils.log_message "done\nCURL cleanup...%!";
Utils.log_message "done\n%!";
Utils.log_with_header "CURL cleanup...%!";
ignore (GapiCurl.global_cleanup context.Context.curl_state);
Utils.log_message "done\nClearing context...%!";
Utils.log_message "done\n%!";
Utils.log_with_header "Clearing context...%!";
Context.clear_ctx ();
Utils.log_message "done\n%!");
start_filesystem !mountpoint !fuse_args
Expand Down
71 changes: 71 additions & 0 deletions src/backgroundFolderFetching.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
open GapiLens.Infix

type t = {
stop_folder_fetching : bool;
read_dir : string -> unit;
}

let stop_folder_fetching = {
GapiLens.get = (fun x -> x.stop_folder_fetching);
GapiLens.set = (fun v x -> { x with stop_folder_fetching = v })
}
let read_dir = {
GapiLens.get = (fun x -> x.read_dir);
GapiLens.set = (fun v x -> { x with read_dir = v })
}

module ConcurrentBackgroundFolderFetching =
ConcurrentGlobal.Make(struct
type u = t
let label = "background-folder-fetching"
end)

let fetch_next_folder cache =
let resource = Cache.Resource.select_next_folder_to_fetch cache in
match resource with
| None -> ()
| Some r ->
begin
let d = ConcurrentBackgroundFolderFetching.get () in
let path = r.CacheData.Resource.path in
let remote_id = Option.default "" r.CacheData.Resource.remote_id in
Utils.log_with_header
"BEGIN: Prefetching folder %s (id=%s).\n%!"
path remote_id;
d.read_dir path;
Utils.log_with_header
"END: Prefetching folder %s (id=%s).\n%!"
path remote_id;
end

let folder_fetch cache =
let check () =
let d = ConcurrentBackgroundFolderFetching.get () in
if d.stop_folder_fetching then raise Exit
in
try
while true do
check ();
Thread.delay 0.5;
fetch_next_folder cache;
done
with Exit -> ()

let start_folder_fetching_thread cache read_dir =
let data = {
stop_folder_fetching = false;
read_dir;
} in
ConcurrentBackgroundFolderFetching.set data;
let thread = Thread.create folder_fetch cache in
Utils.log_with_header
"Starting background folder fetching thread (TID=%d)\n%!"
(Thread.id thread);
Context.update_ctx (Context.folder_fetching_thread ^= Some thread)

let stop_folder_fetching_thread () =
ConcurrentBackgroundFolderFetching.update
(fun b ->
b |> stop_folder_fetching ^= true
)

3 changes: 3 additions & 0 deletions src/backgroundFolderFetching.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
val start_folder_fetching_thread: CacheData.t -> (string -> unit) -> unit

val stop_folder_fetching_thread: unit -> unit
6 changes: 6 additions & 0 deletions src/cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ struct
MemoryCache.Resource.select_resource_with_id cache id
else
DbCache.Resource.select_resource_with_id cache id

let select_next_folder_to_fetch cache =
if cache.CacheData.in_memory then
MemoryCache.Resource.select_next_folder_to_fetch cache
else
DbCache.Resource.select_next_folder_to_fetch cache
(* END Queries *)

end
Expand Down
1 change: 1 addition & 0 deletions src/cache.mli
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sig
val select_resources_with_parent_path : CacheData.t -> string -> bool -> CacheData.Resource.t list
val select_resources_order_by_last_update : CacheData.t -> CacheData.Resource.t list
val select_resource_with_id : CacheData.t -> int64 -> CacheData.Resource.t option
val select_next_folder_to_fetch : CacheData.t -> CacheData.Resource.t option

end

Expand Down
6 changes: 6 additions & 0 deletions src/context.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type t = {
flush_db_thread : Thread.t option;
(* Async upload thread *)
async_upload_thread : Thread.t option;
(* Background folder fetching thread *)
folder_fetching_thread : Thread.t option;
}

let app_dir = {
Expand Down Expand Up @@ -109,6 +111,10 @@ let async_upload_thread = {
GapiLens.get = (fun x -> x.async_upload_thread);
GapiLens.set = (fun v x -> { x with async_upload_thread = v })
}
let folder_fetching_thread = {
GapiLens.get = (fun x -> x.folder_fetching_thread);
GapiLens.set = (fun v x -> { x with folder_fetching_thread = v })
}

let config_lens =
config_store |-- ConfigFileStore.data
Expand Down
21 changes: 21 additions & 0 deletions src/dbCache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,17 @@ struct
in
Sqlite3.prepare db sql

let prepare_select_next_folder_to_fetch_stmt db =
let sql =
"SELECT " ^ fields ^ " \
FROM resource \
WHERE mime_type = 'application/vnd.google-apps.folder' \
AND state = 'ToDownload' \
AND trashed = 0 \
ORDER BY last_update;"
in
Sqlite3.prepare db sql

end

module MetadataStmts =
Expand Down Expand Up @@ -941,6 +952,11 @@ struct
select_resource cache
ResourceStmts.prepare_select_with_id_stmt
(fun stmt -> bind_int stmt ":id" (Some id))

let select_next_folder_to_fetch cache =
select_resource cache
ResourceStmts.prepare_select_next_folder_to_fetch_stmt
(fun _ -> ())
(* END Queries *)

end
Expand Down Expand Up @@ -1148,6 +1164,11 @@ let setup_db cache =
CREATE INDEX IF NOT EXISTS parent_path_index ON resource (parent_path, trashed); \
CREATE INDEX IF NOT EXISTS remote_id_index ON resource (remote_id); \
CREATE INDEX IF NOT EXISTS last_update_index ON resource (last_update); \
CREATE INDEX IF NOT EXISTS next_folder_index \
ON resource (mime_type, state, trashed, last_update) \
WHERE mime_type = 'application/vnd.google-apps.folder' \
AND state = 'ToDownload' \
AND trashed = 0; \
CREATE TABLE IF NOT EXISTS metadata ( \
id INTEGER PRIMARY KEY, \
display_name TEXT NOT NULL, \
Expand Down
1 change: 1 addition & 0 deletions src/dbCache.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sig
val select_resources_order_by_last_update : CacheData.t -> CacheData.Resource.t list
val select_all_resources : CacheData.t -> CacheData.Resource.t list
val select_resource_with_id : CacheData.t -> int64 -> CacheData.Resource.t option
val select_next_folder_to_fetch : CacheData.t -> CacheData.Resource.t option

end

Expand Down
4 changes: 4 additions & 0 deletions src/drive.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,10 @@ let init_filesystem () =
if config.Config.async_upload_queue then begin
UploadQueue.start_async_upload_thread
cache config.Config.async_upload_threads upload_resource_by_id;
end;
if config.Config.background_folder_fetching then begin
BackgroundFolderFetching.start_folder_fetching_thread cache
(fun path -> read_dir path |> ignore);
end
let queue_upload resource =
Expand Down
33 changes: 33 additions & 0 deletions src/memoryCache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,39 @@ struct
Utils.safe_find d.resources id
)

let select_next_folder_to_fetch cache =
ConcurrentMemoryCache.with_lock
(fun () ->
let d = ConcurrentMemoryCache.get_no_lock () in
let resources =
Hashtbl.fold
(fun _ r rs ->
if r.CacheData.Resource.mime_type =
Some "application/vnd.google-apps.folder" &&
r.CacheData.Resource.state =
CacheData.Resource.State.ToDownload &&
r.CacheData.Resource.trashed = Some false
then
r :: rs
else
rs
)
d.resources
[] in
if List.length resources = 0 then None
else
let sorted =
List.sort
(fun x y ->
compare
x.CacheData.Resource.last_update
y.CacheData.Resource.last_update
)
resources
in
Some (List.hd sorted)
)

end

module Metadata =
Expand Down
1 change: 1 addition & 0 deletions src/memoryCache.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ sig
val select_resources_with_parent_path : CacheData.t -> string -> bool -> CacheData.Resource.t list
val select_resources_order_by_last_update : CacheData.t -> CacheData.Resource.t list
val select_resource_with_id : CacheData.t -> int64 -> CacheData.Resource.t option
val select_next_folder_to_fetch : CacheData.t -> CacheData.Resource.t option

end

Expand Down

0 comments on commit 0543cb6

Please sign in to comment.