Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak when getting/setting dataset distributed over last axis #165

Open
jrs65 opened this issue Feb 19, 2021 · 42 comments
Open

Memory leak when getting/setting dataset distributed over last axis #165

jrs65 opened this issue Feb 19, 2021 · 42 comments
Assignees

Comments

@jrs65
Copy link
Contributor

jrs65 commented Feb 19, 2021

This one is a little nebulous. I had originally thought that the issue was when the distributed axis is shorter than the number o processes and so one has nothing to do. That may still be true, but some of the log messages seemed to indicate that it was actually doing the IO split along that axis?? Regardless, reducing the number of MPI processes being used seemed to fix the issue.

Anyway this clearly needs a little more debugging to figure out what exactly is happening, but we shouldn't crash when this happens.

@sjforeman
Copy link
Contributor

sjforeman commented Mar 7, 2023

I'd like to revive this issue, because Haochen and I have recently run into something that seems related, and it would nice to resolve it.

Here is a config that can be used to reproduce the issue on cedar:

cluster:
  name: df_day_{lsd}

  directory: {out_dir}

  time: 10
  system: cedar
  account: rpp-chime
  nodes: {nodes}
  ompnum: 4
  pernode: 12
  mem: 192000M

  venv: {venv}


# Pipeline task configuration
pipeline:

  logging:
    root: DEBUG
    peewee: INFO
    matplotlib: INFO
    h5py: INFO

  tasks:
    - type: draco.core.task.SetMPILogging
      params:
        level_rank0: DEBUG
        level_all: DEBUG

    - type: draco.core.io.LoadFilesFromParams
      out: ringmap
      params:
        files: ["/project/rpp-chime/chime/chime_processed/daily/rev_06/{lsd}/ringmap_intercyl_lsd_{lsd}.zarr.zip"]
        distributed: true

    - type: draco.analysis.dayenu.DayenuDelayFilterMap
      in: ringmap
      out: ringmap_filtered
      params:
        tauw: 0.5
        single_mask: true
        atten_threshold: 0.20
        save: true
        output_name: "ringmap_intercyl_postfilter.h5"

You can pick your favourite LSD, but I've mostly been working with 3245. With nodes=2, this runs without issue. (Expanding the walltime would let it finish, but 10 minutes is sufficient to see if it crashes.) With nodes=4 or higher, we get an ugly crash, usually some variation of

slurmstepd: error:  mpi/pmix_v3: _errhandler: cdr1585 [1]: pmixp_client_v2.c:211: Error handler invoked: status = -25, source = [slurm.pmix.61907741.0:19]
srun: error: cdr1585: task 19: Killed
srun: launch/slurm: _step_signal: Terminating StepId=61907741.0

The crash occurs in this line of mpiarray.MPIArray.to_hdf5():

dset[islice] = self[fslice]

I've attempted some dumb debugging by adding log statements as shown in this git diff:

         # Read using collective MPI-IO if specified
+        logger.debug("RIGHT BEFORE with dset.collective if use_collective else DummyContext():")
+        logger.debug(f"use_collective = {use_collective}, faster_without_collective = {faster_without_collective}, fh.is_mpi = {fh.is_mpi}, no_null_slices = {no_null_slices}")
+        logger.debug(f"split_axis = {split_axis}, dataset = {dataset}, partitions = {partitions}")
         with dset.collective if use_collective else DummyContext():
             # Loop over partitions of the IO and perform them
             for part in partitions:
                 islice, fslice = _partition_sel(
                     sel, split_axis, self.global_shape[split_axis], part
                 )
+                logger.debug(f"islice, fslice, self.shape, dset.shape, self[fslice].shape: {islice}, {fslice}, {self.shape}, {dset.shape}, {self[fslice].shape}")
                 dset[islice] = self[fslice]
+                logger.debug("COMPLETE: dset[islice] = self[fslice]")

         if fh.opened:
             fh.close()

These statements reveal that there is no MPI task that completes its write successfully. Here's what I get from task 0, right before the crash:

   115.2s [MPI  0/48] - DEBUG    caput.mpiarray: RIGHT BEFORE with dset.collective if use_collective else DummyContext():
   115.2s [MPI  0/48] - DEBUG    caput.mpiarray: use_collective = True, faster_without_collective = False, fh.is_mpi = True, no_null_slices = True
   115.2s [MPI  0/48] - DEBUG    caput.mpiarray: split_axis = 0, dataset = /dirty_beam, partitions = [slice(0, 1, None)]
   115.2s [MPI  0/48] - DEBUG    caput.mpiarray: islice, fslice, self.shape, dset.shape, self[fslice].shape: (slice(0, 1, 1), slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(0, 11, None)), (slice(0, 1, None), slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)), (1, 4, 1024, 4096, 11), (1, 4, 1024, 4096, 512), (1, 4, 1024, 4096, 11)

In this case, MPIArray._partition_io() determined that there's no need to further partition the I/O, beyond the way the array is already distributed along the last axis. The size of the slice being written is 1.375GB, so it's not running into any known 2GB-write issues in h5py. If it makes a difference, the "0" axis (the beam axis of the ringmap container) has 1 element.

If we use 2 nodes (24 tasks), the analogous output instead looks like:

   154.3s [MPI  1/24] - DEBUG    caput.mpiarray: RIGHT BEFORE with dset.collective if use_collective else DummyContext():
   154.3s [MPI  1/24] - DEBUG    caput.mpiarray: use_collective = True, faster_without_collective = False, fh.is_mpi = True, no_null_slices = True
   154.3s [MPI  1/24] - DEBUG    caput.mpiarray: 1, /dirty_beam, [slice(0, 2, None), slice(2, 4, None)]
   154.3s [MPI  1/24] - DEBUG    caput.mpiarray: islice, fslice, self.shape, dset.shape, self[fslice].shape: (slice(None, None, None), slice(0, 2, 1), slice(None, None, None), slice(None, None, None), slice(22, 44, None)), (slice(None, None, None), slice(0, 2, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)), (1, 4, 1024, 4096, 22), (1, 4, 1024, 4096, 512), (1, 2, 1024, 4096, 22)

Here, the output is partitioned into 2 slices along the "1" axis (i.e. the pol axis, which has 4 elements). The slice being written is 1.375GB here as well, but now the write operation completes successfully.

I've found that restricting the slice size to 1GB, using threshold=0.99 in MPIArray._partition_io(), seems to solve the problem, but I'm not sure why, unless the h5py output issues with 2GB chunks actually also apply to chunks between 1 and 2GB. Any expert opinions would be appreciated!

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

I think that the slice into dset in the mpiarray line is causing the issue. Just slicing into it without trying to set anything (just logging dset[islice] for example) hangs but doesn't actually crash. In this test case, the distributed axis is significantly longer than the number of processes, so each process has something to do.

One thing I noticed is that the fixes mentioned by both @jrs65 and @sjforeman involve reducing the number of processes, which would increase the size of the local array on each process. In this case, by reducing the number of processes, _partition_io will return multiple parts rather than just one, and those parts will not be across the 0th axis (which here only has length 1). I'm wondering if the issue is somehow with slicing along the zeroth axis

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

This seems relevant, although I can't access the JIRA page linked to see if the issue was ever resolved in hdf5
h5py/h5py#1176

@sjforeman
Copy link
Contributor

Thanks @ljgray! I'm experimenting with possible fixes related to the slicing issues you've described

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

I should note that our situation isn't exactly the same as the one in the h5py issue, as they're trying to take non-consecutive elements, but it's always possible that we're seeing the effects of that issue in another way

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

I think that issue is a red herring as it only applies to chunked data (which this is not).

That said, I can't really see any obvious issues with what's going on.

Can you try forcing collective IO to be off? That is an obvious contender as I think the h5py implementation has some issues (e.g. the eliding of zero length writes, which we already need to work around).

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

I tried forcing collective IO off and the job just timed out after 90 minutes. Interestingly, a few ranks did complete their write quite quickly but most never did

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Can you go one step further and try forcing use_mpi = False so it doesn't open the file with MPI-IO?

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Github really needs to add the 🤔 emoji. That's pretty much the reaction I want to add to every issue comment.

@sjforeman
Copy link
Contributor

In the case where we're just slicing a length-1 axis with slice(0,1,1), I'm trying to test whether simply using slice(None) would solve things

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Can you go one step further and try forcing use_mpi = False so it doesn't open the file with MPI-IO?

Actually that doesn't make any sense. If it can't use MPI-IO it needs to use _to_hdf5_serial as it can't coordinate the writes, so opening with MPI-IO is essential to this.

Sorry, bad suggestion!

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

In the case where we're just slicing a length-1 axis with slice(0,1,1), I'm trying to test whether simply using slice(None) would solve things

Good test.

I think broadly it would be good to try and reduce this to a simpler test case. Can you get it down to something that only needs 4 MPI processes and where the dataset is smaller and with fewer dimensions?

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

Yeah it would be beneficial to be able to iterate more quickly, but it might be tricky to simplify when we don't really know exactly what the issue is.

@sjforeman just note that using a slice(None) will fail in _partition_sel, I believe in _reslice, so there will have to be some modifications for that. I tried doing that test yesterday but didn't get around to actually making that change.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

Also, one other test I ran last night applied the slice to a different axis than zero, with the same result, so it doesn't seem to be anything unique to that axis

@sjforeman
Copy link
Contributor

Yeah it would be beneficial to be able to iterate more quickly, but it might be tricky to simplify when we don't really know exactly what the issue is.

You bet, I'll see if I can come up with a smaller test case that fails in the same way

@sjforeman just note that using a slice(None) will fail in _partition_sel, I believe in _reslice, so there will have to be some modifications for that. I tried doing that test yesterday but didn't get around to actually making that change.

Yeah, I also tried something this morning that failed in _reslice as you describe. I'm trying a more direct hack now

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

Yeah, I also tried something this morning that failed in _reslice as you describe. I'm trying a more direct hack now

It may take awhile to complete, but I'm running a test to brute force the slice into being a slice(None) and the just trying to slice into the dataset without setting anything. This has just been hanging for ~15 minutes but not crashing. This is the same behaviour that I saw when running without collective IO. Let me know if you come up with anything different

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

I have not been able to make it fail at a smaller size (as of yet). I'm giving up for now, but in case anyone else finds it useful, this is what I was doing:

import logging
import numpy as np

from mpi4py import MPI

from caput import mpiarray
logging.basicConfig(level=logging.DEBUG)

comm = MPI.COMM_WORLD

# mul=1 should correspond to the crashing case, mul=2 the one that works
mul = 1

shape = (1, 4, 1024, 4096, 11 * mul * comm.size + 1)

print(f"rank={comm.rank=}/{comm.size}: {shape=}")
d = mpiarray.MPIArray(shape, dtype=np.float64, axis=4)

print(f"rank={comm.rank=}/{comm.size}: {d.local_shape=}")
print(f"rank={comm.rank=}/{comm.size}: filling")
d[:] = 1.0

print(f"rank={comm.rank=}/{comm.size}: writing")
d.to_hdf5("testfile.h5", "testdset", create=True)
comm.Barrier()
print(f"rank={comm.rank=}/{comm.size}: done")

This didn't crash for me with either 2 or 6 processes (all on the same node). I think going to 24 should correspond pretty much exactly to what Simon was doing.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

A quick update - forcing slice(None) on the split axis did eventually let me index into the dset, but it took ~25 minutes to do so on all ranks. It then crashed when trying to actually set the new dset data. seff shows 102% memory usage, so I believe that the crash is happening due to a memory issue.

We have access to more memory when running on 4 nodes vs 2, so it makes me think that there's some sort of memory leak going on

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

@sjforeman is it crashing on the first dataset in the container that it's trying to write out? I've had issues before where the issue happens at an earlier dataset, and then only surface later on. e.g. the writes get out of sync some how.

You might consider chucking in a Barrier at the end start and end of to_hdf5 to force some synchronisation and see if that does anything.

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

We have access to more memory when running on 4 nodes vs 2, so it makes me think that there's some sort of memory leak going on

Interesting!

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

You might consider chucking in a Barrier at the end start and end of to_hdf5 to force some synchronisation and see if that does anything.

I didn't actually try putting a barrier at the start, but putting one at the end of to_hdf5 doesn't seem to help. As far as I can tell, this config crashes on the first dataset.

@sjforeman
Copy link
Contributor

I also found that manually using slice(None) along the (1-element) split axis didn't fix the problem, although I didn't split the test into separate indexing and setting steps like @ljgray did. I also found that when subselecting the distributed axis (ringmap el) in my tests by a factor of 4, and reducing the number of processes by a factor of 4, the crash doesn't occur. This is all in line with what you guys are finding.

@sjforeman
Copy link
Contributor

@jrs65 It does appear to be crashing on the first dataset it attempts to write out

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

It might be helpful to ssh into the node while it's running and use htop or similar to watch the memory usage through the run. I'm struggling to understand how exactly a memory issue would surface without a low level bug. The memory used per node to store the data will have gone down by half, and the amount of data being handled in the write at each iteration is the same. I think it would need to be a bug in HDF5 or h5py, or something weird about how the IO is being distributed, i.e. MPI-IO might be collecting everything onto one node and writing from there, and that would increase the amount of memory being used on that node by a lot.

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Reminding myself how it all works MPI-IO will indeed aggregate the data to write onto a smaller set of nodes, but the buffers are all 32 MB max sizes, so I don't quite see how it would eat that much memory.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

I'm watching htop while running and the memory usage is indeed creeping up slowly over time while trying to slice into the dataset. I'm not overly familiar with the internals of HDF5 though, so maybe this is expected to some extent depending on how it handles buffers and such? Curious to see what happens when it actually crashes though

@sjforeman
Copy link
Contributor

I've been experimenting with chunking for the dataset where the crash occurs by changing "chunks" in the draco container definition. Increasing or decreasing the chunk size along the distributed axis, or turning off chunking altogether, does't make any difference

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Indeed. Chunking is disabled for HDF5 output as parallel chunked IO is fundamentally broken in HDF5.

@jrs65
Copy link
Contributor Author

jrs65 commented Mar 10, 2023

Indeed. Chunking is disabled for HDF5 output as parallel chunked IO is fundamentally broken in HDF5.

That should be true, though I'm struggling to see where we actually turn that on and off at the moment! I'm pretty sure it is there though.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

That should be true, though I'm struggling to see where we actually turn that on and off at the moment! I'm pretty sure it is there though.

We might just be relying on the fact that chunks is set to None by default in all MemDatasets and hopefully never changed. I can't really find anywhere where chunks is explicitly set to None when using parallel IO

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

Ok, I managed to make it work. Sort of. I restricted the min_axis_size in _partition_io to be at least 2 (so 2 partitions) and it got through without crashing.

  • Each node used ~175 GB out of 188 GB available to save each part, which is about 25 GB more than was used just while holding the dataset in memory. That amount creeps up over time while performing the write. So I'd imagine that doubling the size of the partition would definitely crash the job.
  • Writing to the dataset took about 30 minutes, and that's just for the first dataset.
  • Memory does seem to be released properly once the dataset is completely written out

I don't really understand why so much memory is being used in the first place (just holding the data), because the ringmaps are quite small. I also don't understand why it would work fine with fewer nodes/processes.

Reducing the tolerance as @sjforeman did earlier would result in the same split, which would explain why it worked.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

@sjforeman How long did it take to run this when you used 2 nodes?

@sjforeman
Copy link
Contributor

It took 51 minutes of walltime for the 2-node job to write the full ringmap to disk

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

It took 51 minutes of walltime for the 2-node job to write the full ringmap to disk

Interesting. The job timed out after 90 minutes on 4 nodes

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

I had a look at the HDF5 issue that I linked before, it's still open and was reassigned to someone as of Jan. 2023 and affects versions starting at 1.10.4 (at least that's what's listed).

I'm not entirely sure if that's the culprit here, as this could be something to do with mpi_io as well.

@sjforeman
Copy link
Contributor

sjforeman commented Mar 10, 2023

It took 51 minutes of walltime for the 2-node job to write the full ringmap to disk

Interesting. The job timed out after 90 minutes on 4 nodes

Hmm. Maybe it's just a temporary I/O slowdown? (I've noticed that scratch is a bit laggy for me over the past few hours).

In the meantime, here are some other data points to consider: if I only read the first 64 elements of the el axis of the ringmap (i.e. using el_range: [0, 64] in LoadFilesFromParams), and use 48 or 24 processes on a single cedar node, I get a crash. For 12 processes, there is no crash. I wasn't able to check the memory usage while these were running, but I wouldn't be surprised if the node's memory filled up.

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

In the meantime, here are some other data points to consider: if I only read the first 64 elements of the el axis of the ringmap (i.e. using el_range: [0, 64] in LoadFilesFromParams), and use 48 or 24 processes on a single cedar node, I get a crash. For 12 processes, there is no crash. I wasn't able to check the memory usage while these were running, but I wouldn't be surprised if the node's memory filled up.

If you still have the job id available for those, can you check them using seff <jobid>? That will at least show the total usage for the job, which will probably be around 100% if they did in fact fail due to memory

@sjforeman
Copy link
Contributor

Thanks for the tip!

48 tasks:

Memory Utilized: 229.96 GB (estimated maximum)
Memory Efficiency: 122.64% of 187.50 GB (187.50 GB/node)

24 tasks:

Memory Utilized: 231.54 GB (estimated maximum)
Memory Efficiency: 123.49% of 187.50 GB (187.50 GB/node)

12 tasks (timed out without crashing):

Memory Utilized: 93.91 GB (estimated maximum)
Memory Efficiency: 50.08% of 187.50 GB (187.50 GB/node)

48 tasks, forcing min_axis_size in _partition_io to be at least 2:

Memory Utilized: 229.67 GB (estimated maximum)
Memory Efficiency: 122.49% of 187.50 GB (187.50 GB/node)

My guess is that the min_axis_size fix will let us avoid catastrophic memory leaks in some cases, but we're just getting lucky in those cases...

@ljgray
Copy link
Contributor

ljgray commented Mar 10, 2023

One more test - in the memh5 call to data.to_file, I got memh5 to redistribute each dataset across its largest axis before calling .to_file. The job completed on 4 nodes in under 7 minutes with no issues. I'm not exactly sure what that means yet, but maybe mpi_io didn't like being distributed across the last axis?

@sjforeman
Copy link
Contributor

My tests are also successful if I manually redistribute in another axis before writing to disk. One could imagine the last axis causing issues because it is the fastest-varying axis, and therefore the least efficient axis to be using for random access...

@ljgray
Copy link
Contributor

ljgray commented Mar 11, 2023

I've tested distributed across all axes and it's only the last axis that causes an issue (did not try distributed across the 0th axis because we don't use collective IO in that case anyway)

@ljgray
Copy link
Contributor

ljgray commented Mar 11, 2023

My tests are also successful if I manually redistribute in another axis before writing to disk. One could imagine the last axis causing issues because it is the fastest-varying axis, and therefore the least efficient axis to be using for random access...

It would be the least efficient, but whatever is going on here probably goes beyond that I would think.

@ljgray ljgray changed the title Distributed IO breaks when len(axis) < comm.size Memory leak when getting/setting dataset distributed over last axis Jun 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants