Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scn.save_dataset() slows down gradually during a loop #2964

Open
yukaribbba opened this issue Nov 3, 2024 · 28 comments
Open

scn.save_dataset() slows down gradually during a loop #2964

yukaribbba opened this issue Nov 3, 2024 · 28 comments

Comments

@yukaribbba
Copy link
Contributor

Describe the bug
The results I need are some raw bands in float32 geotiffs, without any corrections or enhancements. During the process, cpu/memory usage and system temps remained a resonable level. I got what I want and no errors poped up. Just the processing time is longer and longer. The more datasets and products involved, the more significant this becomes. And it happens on several readers like ami, abi or fci, not just ahi (I haven't tested others yet).
image

To Reproduce

import glob
import os
import time as time_calc
import warnings

import numpy as np
import gc


os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config, utils

utils.debug_on()

config.set(config_path=["D:/satpy_config"])

def satpy_sat_to_float32(folder, sat_name, scan_area, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    for band_image in band_image_list:
        output = (f"{sat_name}_{scn[band_image].attrs["sensor"]}_{band_image}_"
                  f"{scn.start_time.strftime("%Y%m%d%H%M%S")}_{scan_area}")
        output_filename = f"{output}.tif"

        scn.save_dataset(band_image, filename=output_filename, writer="geotiff",
                         base_dir=tmp_folder, num_threads=8, compress=None, enhance=False, dtype=np.float32,
                         fill_value=np.nan)

    end_save = time_calc.perf_counter()
    print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
    del scn
    gc.collect()

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "H09", "FLDK", "ahi_hsd", ["Visible004_1000", "Visible005_1000",
                                                            "Visible006_500", "Visible008_1000",
                                                            "NearInfraRed016_2000", "NearInfraRed022_2000",
                                                            "InfraRed038_2000", "InfraRed063_2000",
                                                            "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000",
                                                            "InfraRed096_2000", "InfraRed105_2000", "InfraRed112_2000",
                                                            "InfraRed123_2000", "InfraRed133_2000"])

Expected behavior
Time consumption should be more stable.

Actual results
A part of the debug output:
debug.txt

Environment Info:

  • OS: Windows 11
  • Satpy Version: 0.52.1
  • Dask: 2024.10.0
  • rasterio: 1.4.2
@yukaribbba
Copy link
Contributor Author

P.S. I also tried compute_writer_result() to save them in one go but still got the similar result.

@pnuu
Copy link
Member

pnuu commented Nov 3, 2024

Without testing anything, one thing that should help at least a bit is to add tiled=True, blockxsize=512, blockysize=512 to the save_dataset() call. Then the data can actually be written in parallel.

Also the compute=False and compute_writer_results() that you've tried should help because then the navigation data should be accessed/computed only once.

I'm not sure (again, without testing) I'm not sure how the compress and num_threads kwargs are working the way you have them now. Setting GDAL_NUM_THREADS=6 environment variable will tell GDAL to use six cores. Playing with DASK_NUM_WORKERS and DASK_ARRAY__CHUNK_SIZE might also help. These should be set before import Scene.

I don't know what is causing the increase in processing time though. I remember we had some memory accumulation happening in Trollflow/Trollflow2 before we switched to separate processes for consecutive runs.

@yukaribbba
Copy link
Contributor Author

Thanks @pnuu! I tried these options and they do help a lot to improve the baseline, but can't stop it coming down little by little (not that much as previous one though). Here's the result of my 142 datasets. If the data list is long enough this could still be a problem. I wonder if there're other things wrong here but can't figure them out...

import glob
import logging
import os
import time as time_calc
import warnings

import numpy as np
import gc

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("output.log"),
        logging.StreamHandler()
    ]
)

os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("GDAL_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config, utils
from satpy.writers import compute_writer_results

utils.debug_on()

config.set(config_path=["D:/satpy_config"])


def satpy_sat_to_float32(folder, sat_name, scan_area, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    res_list = []
    for band_image in band_image_list:
        output = (f"{sat_name}_{scn[band_image].attrs["sensor"]}_{band_image}_"
                  f"{scn.start_time.strftime("%Y%m%d%H%M%S")}_{scan_area}")
        output_filename = f"{output}.tif"

        res = scn.save_dataset(band_image, filename=output_filename, writer="geotiff", tiled=True, blockxsize=512, blockysize=512,
                         base_dir=tmp_folder, num_threads=8, compress=None, enhance=False, dtype=np.float32, compute=False,
                         fill_value=np.nan)
        res_list.append(res)

    compute_writer_results(res_list)
    end_save = time_calc.perf_counter()
    print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
    del scn
    gc.collect()

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "H09", "FLDK", "ahi_hsd", ["Visible004_1000", "Visible005_1000",
                                                            "Visible006_500", "Visible008_1000",
                                                            "NearInfraRed016_2000", "NearInfraRed022_2000",
                                                            "InfraRed038_2000", "InfraRed063_2000",
                                                            "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000",
                                                            "InfraRed096_2000", "InfraRed105_2000", "InfraRed112_2000",
                                                            "InfraRed123_2000", "InfraRed133_2000"])

image

Debug output of the first and last dataset:
debug.txt

@mraspaud
Copy link
Member

mraspaud commented Nov 4, 2024

@yukaribbba do you see the same trend in memory usage?
A while ago, when working on a batch-processing lib called trollflow2, we realized something was leaking memory very slowly. We could never pinpoint it exactly, but we suspected it was something in dask at the time. We didn't investigate much after that, and started running every slot in individual processes instead. That seems to clean up the memory at least. Maybe something to try here too?

@yukaribbba
Copy link
Contributor Author

@mraspaud I'm also guessing this is about memory, garbage collection or things like that. My memory remained normal during the workflow. Actually this is a part of a QThread but I do them in batch mode. Looks like I need to split all those data folders one by one like you said. Start the thread, do the job, quit the thread, move to next folder and start it again. Maybe this can avoid the problem? I'll give it a try.

@mraspaud
Copy link
Member

mraspaud commented Nov 4, 2024

Threading might work, but I would recommend going all the way and using multiprocessing instead. Memory can be shared in threads, but in principle not with separate processes.

@yukaribbba
Copy link
Contributor Author

multiprocessing did it, nice and clean. @mraspaud Appreciate that 😄
But I still want to keep this open to see if we can find what's truly behind it and fix it if possible.
image

@mraspaud
Copy link
Member

mraspaud commented Nov 4, 2024

At the time, the hardest part was to find a minimal example to reproduce the error... It would be great if that could be done, so we can investigate further!

@djhoese
Copy link
Member

djhoese commented Nov 4, 2024

I don't recall the outcome of everything we tried last time this came up, but there are few things I think we could try:

  1. As Martin said, getting a minimal example would be amazing.

  2. Towards a minimal example and to simplify your code, unless I'm mistaken there is nothing in your filename that couldn't be formatted by the Scene/Writer itself. You should be able to pass filename as a format string, call Scene.save_datasets instead of .save_dataset, and have the Scene do the filename formatting for you. That way you don't have to do any of this compute_writer_results stuff. So something like this if I'm not mistaken:

    filename = "{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif"

    But if .area_id is not accurate you could hardcode that before hand since it doesn't change per-dataset (ex. + f"{scan_area}.tif" or use double {{ }} to allow you to do formatting).

  3. What if you use the PNG writer? It will take longer to process but since it uses a completely different backend library it might behave differently memory/cache-wise.

@yukaribbba
Copy link
Contributor Author

Ok I have two minimal examples here, one for png:

import glob
import os
import time as time_calc
import warnings
import numpy as np

os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("GDAL_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config

config.set(config_path=["D:/satpy_config"])


def satpy_sat_to_float32(folder, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    scn.save_datasets(writer="simple_image",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.png",
                      base_dir=tmp_folder, enhance=False, compress_level=0)
    end_save = time_calc.perf_counter()
    print(f"Save to int8 png: {(end_save - start_save): .6f}")
    del scn

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
results = []
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "ahi_hsd",
                         [
                             "Visible004_1000", "Visible005_1000", "Visible006_500", "Visible008_1000",
                             "NearInfraRed016_2000", "NearInfraRed022_2000", "InfraRed038_2000", "InfraRed063_2000",
                             "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000", "InfraRed096_2000",
                             "InfraRed105_2000", "InfraRed112_2000", "InfraRed123_2000", "InfraRed133_2000"
                         ]
                         )

another for geotiff:

......
scn.save_datasets(writer="geotiff",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif",
                      base_dir=tmp_folder, blockxsize=512, blockysize=512, num_threads=8, compress=None,
                      enhance=False, dtype=np.float32, fill_value=np.nan)
......

Both of them showed sign of downgrade:
image
image

@djhoese
Copy link
Member

djhoese commented Nov 5, 2024

Ok, new thing to try: Remove the scn.save_datasets call completely and replace with scn.compute(). This should compute the dask arrays inside the DataArrays and then immediately drop them (since the return value isn't saved to a variable name) and garbage collection should clean it up. You could even add a collect call to the end of the loop to force it to be garbage collected:

import gc
print("Unreachable objects: ", gc.collect())

@yukaribbba
Copy link
Contributor Author

Sorry I'm not available till saturday. I'll check that once free again.

@yukaribbba
Copy link
Contributor Author

This time I got nearly 300 datasets for scn.compute(). And if you ignore the very start of the test, they just look normal in general.
image
For the gc.collect(). At first it jumps between 0 and 36991 and then rises to 38642 later.
image

@yukaribbba
Copy link
Contributor Author

As for geotiff, I don't have much space on SSD to hold the output of 284 datasets so I did it on a hard drive and the whole progress became slow, but you can still see the trend clearly. @djhoese
image

@mraspaud
Copy link
Member

@yukaribbba that's really interesting! so that means that the image saving is the leaking?

@yukaribbba
Copy link
Contributor Author

@mraspaud It looks like that. I feel like something isn't cleaned up in the writer part so they got accumulated little by little.

@djhoese
Copy link
Member

djhoese commented Nov 13, 2024

Have you tried the garbage collection collect after the save_datasets? Does that change this timing result?

Edit: Oops, fat fingered the close button instead of comment.

@djhoese djhoese closed this as not planned Won't fix, can't repro, duplicate, stale Nov 13, 2024
@djhoese djhoese reopened this Nov 13, 2024
@yukaribbba
Copy link
Contributor Author

I did but it's not helping. But I found some other things. scn.save_datasets() is using compute_writer_results. In compute_writer_results I see two parts: one for dask compute, another for file writing. So I add two timers for each.

def compute_writer_results(results):
    """Compute all the given dask graphs `results` so that the files are saved.

    Args:
        results (iterable): Iterable of dask graphs resulting from calls to
                            `scn.save_datasets(..., compute=False)`
    """
    if not results:
        return

    sources, targets, delayeds = split_results(results)

    # one or more writers have targets that we need to close in the future
    if targets:
        delayeds.append(da.store(sources, targets, compute=False))

    import time as time_calc
    start_compute = time_calc.perf_counter()
    if delayeds:
        da.compute(delayeds)
    end_compute = time_calc.perf_counter()
    print("da.compute: ", end_compute - start_compute)

    start_close = time_calc.perf_counter()
    if targets:
        for target in targets:
            if hasattr(target, "close"):
                target.close()
    end_close = time_calc.perf_counter()
    print("target close: ", end_close - start_close)

image
image
scn.save_datasets should be done after target.close(). But actually it's not, an unknown part exists and is slowing down.
image
Since I put gc.collect() out side the timer, it must be other things but I don't know what that is.

start_save = time_calc.perf_counter()
scn.save_datasets(writer="geotiff",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif",
                      base_dir=tmp_folder,
                      blockxsize=512, blockysize=512, num_threads=8, compress=None,
                      enhance=False, dtype=np.float32, fill_value=np.nan)
end_save = time_calc.perf_counter()
print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
gc.collect()
del scn

@djhoese
Copy link
Member

djhoese commented Nov 14, 2024

I could imagine that being either:

  1. Dask cleaning things up.
  2. GDAL/rasterio/rioxarray cleaning things up.

After the writer's save_datasets method is called, I believe the Scene throws away (allows it to be garbage collected) the Writer object.

Overall I don't think geotiff should be used for performance testing of this sort because there are too many unknowns in rasterio/gdal's caching to really tie down what's happening. PNG, although slower, should be "dumber" as far as background threads and shared caches.

@yukaribbba
Copy link
Contributor Author

Unexpectedly, with gc.collect() png writer looks normal and stable. And that mysterious 'cleaning up' part also disapeared.
image

So this is more like a GDAL cache issue I guess? I'll go back to geotiff and make GDAL clear its cache by force in every loop, and see how it goes.

@djhoese
Copy link
Member

djhoese commented Nov 15, 2024

How do you force GDAL to clear its cache?

@simonrp84
Copy link
Member

Could it be similar to this? OSGeo/gdal#7908
Or the need to use tcmalloc? https://gdal.org/en/latest/user/multithreading.html#ram-fragmentation-and-multi-threading

@djhoese
Copy link
Member

djhoese commented Nov 15, 2024

Oh boy that tcmalloc "solution" is quite the hack. You're basically telling the dynamic library loader to use a different dependency library.

I don't see it in graph form in this thread, but has memory usage been tracked over the execution time? Preferably with some profiling tool. If you lowered the GDAL_CACHEMAX environment variable enough I'd expect you either see the slow execution time sooner/faster or you see it plateau. My thinking is that if the cache max was lowered to use less memory then execution might slow down sooner if GDAL is getting a lot of cache misses. Or execution time would plateau because of all the cache misses. I guess for the same reasons increasing gdal cache max would be good to test. If GDAL is given like 80% of your RAM and we assume no hardware issues (some memory chips on the system are slower/more damaged than others) then you'd think you'd see some kind of flattening of the execution speed as GDAL is able to use more and more cache/RAM space. This assumes that you have enough RAM on your system to fit all/most of the rasters being saved/worked on.

@yukaribbba
Copy link
Contributor Author

yukaribbba commented Nov 17, 2024

OK several things newly found:

  1. Just by setting GDAL_CACHEMAX to 4096MB (the default on my machine is 3273MB), it's getting much better now. da.compute is completely stable. I don't know if 800MB will make such a big difference.
    image

  2. The only lagging part is that "cleaning up", and I was wrong about it. Actually it didn't happen after compute_writer_results, but before it, in these lines:
    https://github.com/pytroll/satpy/blob/c2149175d2256e09ea0f61505a151e5625a80d87/satpy/writers/__init__.py#L750C1-L752C75
    But I just don't understand why.
    image

@yukaribbba
Copy link
Contributor Author

yukaribbba commented Nov 18, 2024

Put more timers in trollimage.xrimage and found some 'suspicious' lines:
https://github.com/pytroll/trollimage/blob/6dced22bc8e7a36513f77bcbf027314dec0a5a1a/trollimage/xrimage.py#L391-L400
image
Maybe this newly opened RIOFile didn't get properly closed? @djhoese

Edit: memeory usage seems to be normal. In every loop its maximum reaches 16-17GB and drops quickly.

@djhoese
Copy link
Member

djhoese commented Nov 18, 2024

Sorry for not responding earlier. Out with a sick kid so I'll be a little slow for at least today.

This is expected and is a downside to geotiff writing with rasterio and dask at the time we wrote this. So first, we have to tell dask to use a lock between threads to not write in parallel because rasterio doesn't (didn't?) support parallel geotiff writing. Dask also doesn't have a way to say "this file is related to these dask tasks and should be closed when they are completed". So we keep the file-like object open and have the user close it. I believe rioxarray has better handling of this, but none of the core trollimage developers have had a chance to migrate this code.

@yukaribbba
Copy link
Contributor Author

OK I see. So is there any way to close it myself?

@djhoese
Copy link
Member

djhoese commented Nov 18, 2024

That's one of the things compute_writer_results does. One complication would be if dask and/or GDAL is doing something like copying some of these objects that cross thread boundaries and is not cleaning them up properly. Not that they're doing anything wrong, but they may not be used to people abusing the system like we are. Or we could be doing something wrong and not cleaning things up properly given the weird circumstance(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants