From 27af35da26c3a325beace80cf203c9e220b2519d Mon Sep 17 00:00:00 2001 From: Ryan Grout Date: Tue, 22 Oct 2024 13:16:09 -0600 Subject: [PATCH] Consolidate block fetch requests. --- fsspec/caching.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/fsspec/caching.py b/fsspec/caching.py index a3f7a1c9f..9e0a994c9 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -7,6 +7,8 @@ import os import threading import warnings +from itertools import groupby +from operator import itemgetter from concurrent.futures import Future, ThreadPoolExecutor from typing import ( TYPE_CHECKING, @@ -161,21 +163,21 @@ def _fetch(self, start: int | None, end: int | None) -> bytes: return b"" start_block = start // self.blocksize end_block = end // self.blocksize - need = [i for i in range(start_block, end_block + 1) if i not in self.blocks] - hits = [i for i in range(start_block, end_block + 1) if i in self.blocks] + block_range = range(start_block, end_block + 1) + need = [i for i in block_range if i not in self.blocks] self.miss_count += len(need) - self.hit_count += len(hits) - while need: - # TODO: not a for loop so we can consolidate blocks later to - # make fewer fetch calls; this could be parallel - i = need.pop(0) - - sstart = i * self.blocksize - send = min(sstart + self.blocksize, self.size) + self.hit_count += len(self.blocks.intersection(block_range)) + + # Consolidate needed blocks. + # Algorithm adapted from Python 2.x itertools documentation + for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): + _blocks = tuple(map(itemgetter(1), _blocks)) + sstart = _blocks[0] * self.blocksize + send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) self.total_requested_bytes += send - sstart - logger.debug(f"MMap get block #{i} ({sstart}-{send})") + logger.debug(f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send}") self.cache[sstart:send] = self.fetcher(sstart, send) - self.blocks.add(i) + self.blocks.update(_blocks) return self.cache[start:end]