Skip to content

Commit

Permalink
Split vector reads into smaller chunk when single requests are larger…
Browse files Browse the repository at this point in the history
… than `max_element_size` (#243)

* add test

* fix test

* add chunk splitting

* add merging of subdivided chunks

* add another test to check for consistency betwenn xrootd sources

* fix

* add distinction between (thread) local and global futures dict

* flake8

* add num_workers argument to XRootDSource

* add num_workers argument to tests for XRootDSource

* improve docstring

* ensure futures are only submitted **after** adding notification

Co-authored-by: Nikolai Hartmann <[email protected]>
  • Loading branch information
nikoladze and Nikolai Hartmann authored Jan 18, 2021
1 parent 3f7c09a commit 476d80a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 14 deletions.
50 changes: 49 additions & 1 deletion tests/test_0001-source-class.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def test_xrootd_vectorread():
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
max_num_elements=None,
num_workers=1
) as source:
notifications = queue.Queue()
chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications)
Expand All @@ -311,13 +312,59 @@ def test_xrootd_vectorread():
assert one[:4] == b"root"


@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread_max_element_split():
pytest.importorskip("XRootD")
with uproot.source.xrootd.XRootDSource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
max_num_elements=None,
num_workers=1
) as source:
notifications = queue.Queue()
max_element_size = 2097136
chunks = source.chunks([(0, max_element_size + 1)], notifications)
one, = [tobytes(chunk.raw_data) for chunk in chunks]
assert len(one) == max_element_size + 1


@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread_max_element_split_consistency():
pytest.importorskip("XRootD")
filename = "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"
def get_chunk(Source, **kwargs):
with Source(
filename, **kwargs
) as source:
notifications = queue.Queue()
max_element_size = 2097136
chunks = source.chunks([(0, max_element_size + 1)], notifications)
one, = [tobytes(chunk.raw_data) for chunk in chunks]
return one
chunk1 = get_chunk(
uproot.source.xrootd.XRootDSource,
timeout=10,
max_num_elements=None,
num_workers=1
)
chunk2 = get_chunk(
uproot.source.xrootd.MultithreadedXRootDSource,
timeout=10,
num_workers=1
)
assert chunk1 == chunk2



@pytest.mark.network
@pytest.mark.xrootd
def test_xrootd_vectorread_fail():
pytest.importorskip("XRootD")
with pytest.raises(Exception) as err:
source = uproot.source.xrootd.XRootDSource(
"root://wonky.cern/does-not-exist", timeout=1, max_num_elements=None
"root://wonky.cern/does-not-exist", timeout=1, max_num_elements=None, num_workers=1
)


Expand All @@ -329,6 +376,7 @@ def test_xrootd_size():
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
max_num_elements=None,
num_workers=1
) as source:
size1 = source.num_bytes

Expand Down
1 change: 1 addition & 0 deletions tests/test_0006-notify-when-downloaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def test_xrootd_vectorread():
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
max_num_elements=None,
num_workers=1,
) as source:
chunks = source.chunks(
[(0, 100), (50, 55), (200, 400)], notifications=notifications
Expand Down
1 change: 1 addition & 0 deletions tests/test_0007-single-chunk-interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def test_xrootd_vectorread():
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=10,
max_num_elements=None,
num_workers=1,
) as source:
one = tobytes(source.chunk(0, 100).raw_data)
assert len(one) == 100
Expand Down
91 changes: 78 additions & 13 deletions uproot/source/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,25 @@ def task(resource):

return uproot.source.futures.ResourceFuture(task)

@staticmethod
def mergefuture(partfutures):
"""
Returns a :doc:`uproot.source.futures.ResourceFuture` that merges the
chunks previously submitted via
:ref:`uproot.source.xrootd.XRootDResource.partfuture` which had to be split
"""

def task(resource):
if len(partfutures) == 1:
return partfutures[0].result()

chunk_buffers = []
for future in partfutures:
chunk_buffers.append(future.result())
return b''.join(chunk_buffers)

return uproot.source.futures.ResourceFuture(task)

@staticmethod
def callbacker(futures, results):
"""
Expand All @@ -210,7 +229,7 @@ class XRootDSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL of the file to open.
options: Must include ``"timeout"`` and ``"max_num_elements"``.
options: Must include ``"timeout"``, ``"max_num_elements"`` and ``"num_workers"``
A :doc:`uproot.source.chunk.Source` that uses XRootD's vector-read
to get many chunks in one request.
Expand All @@ -221,6 +240,7 @@ class XRootDSource(uproot.source.chunk.Source):
def __init__(self, file_path, **options):
timeout = options["timeout"]
max_num_elements = options["max_num_elements"]
num_workers = options["num_workers"]
self._num_requests = 0
self._num_requested_chunks = 0
self._num_requested_bytes = 0
Expand All @@ -231,6 +251,12 @@ def __init__(self, file_path, **options):

self._resource = XRootDResource(file_path, timeout)

# this ThreadPool does not need a resource, it's only used to submit
# futures that wait for chunks that have been split to merge them.
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[None for i in range(num_workers)]
)

self._max_num_elements, self._max_element_size = get_server_config(
self._resource.file
)
Expand All @@ -257,29 +283,54 @@ def chunks(self, ranges, notifications):
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

# ranges for xrootd vector reads
all_request_ranges = [[]]
for start, stop in ranges:
if stop - start > self._max_element_size:
raise NotImplementedError(
"TODO: Probably need to fall back to a non-vector read"
)

# dictionary telling us which xrootd request ranges correspond to the
# actually requested ranges (given as (start, stop))
# this is to track which requests were split into smaller ranges and have to be merged
sub_ranges = {}

def add_request_range(start, length, sub_ranges_list):
if len(all_request_ranges[-1]) > self._max_num_elements:
all_request_ranges.append([])
all_request_ranges[-1].append((start, stop - start))
all_request_ranges[-1].append((start, length))
sub_ranges_list.append((start, start + length))

chunks = []
# figure out the vector read ranges
for start, stop in ranges:
length = stop - start
sub_ranges[start, stop] = []

# if range larger than maximum, split into smaller ranges
if length > self._max_element_size:
nsplit = length // self._max_element_size
rem = length % self._max_element_size
for i in range(nsplit):
add_request_range(
start + i * self._max_element_size,
self._max_element_size,
sub_ranges[start, stop]
)
if rem > 0:
add_request_range(
start + nsplit * self._max_element_size,
rem,
sub_ranges[start, stop]
)
else:
add_request_range(start, length, sub_ranges[start, stop])

# submit the xrootd vector reads
global_futures = {}
for i, request_ranges in enumerate(all_request_ranges):
futures = {}
results = {}
for start, size in request_ranges:
stop = start + size
partfuture = self.ResourceClass.partfuture(results, start, stop)
futures[start, stop] = partfuture
chunk = uproot.source.chunk.Chunk(self, start, stop, partfuture)
partfuture._set_notify(
uproot.source.chunk.notifier(chunk, notifications)
)
chunks.append(chunk)
global_futures[start, stop] = partfuture

callback = self.ResourceClass.callbacker(futures, results)

Expand All @@ -289,6 +340,20 @@ def chunks(self, ranges, notifications):
if status.error:
self._resource._xrd_error(status)

# create chunks (possibly merging xrootd chunks)
chunks = []
for start, stop in ranges:
partfutures = []
for sub_start, sub_stop in sub_ranges[start, stop]:
partfutures.append(global_futures[sub_start, sub_stop])
future = self.ResourceClass.mergefuture(partfutures)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future._set_notify(
uproot.source.chunk.notifier(chunk, notifications)
)
self._executor.submit(future)
chunks.append(chunk)

return chunks

@property
Expand Down

0 comments on commit 476d80a

Please sign in to comment.