Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safety issue with open_mfdataset, Zarr, and dask #8815

Closed
5 tasks
csubich opened this issue Mar 8, 2024 · 4 comments
Closed
5 tasks

Thread safety issue with open_mfdataset, Zarr, and dask #8815

csubich opened this issue Mar 8, 2024 · 4 comments
Labels
plan to close May be closeable, needs more eyeballs

Comments

@csubich
Copy link

csubich commented Mar 8, 2024

What happened?

I have a large-ish weather dataset (mirrored version of a subset of WeatherBench2 data), which is stored on-disk as a collection of Zarr datasets, segregated by time. Because processing is intensive and GPU-related, I handle the data in parallel as:

  • Open the large dataset with:
a_dbase = xr.open_mfdataset(FILE_LIST,engine='zarr', coords='minimal',compat='override',concat_dim='time',combine='nested')
  • Create several worker threads to assemble randomly-selected subsets of this data, using slicing on the generated Dask arrays1:
inputs = xr.Dataset(coords={'time':atimes_ns,
                    'level':input_levels,
                    'latitude':latitude,
                    'longitude':longitude}) 
[...]
a_level_idx = np.searchsorted(inputs.level.data,input_levels)
a_time_idx = np.searchsorted(inputs.time.data,atimes_ns)
for var in avars_present:
    if 'time' in inputs[var].dims:
        if 'level' in inputs[var].dims:
            inputs[var] = (inputs[var].dims, inputs[var].data[a_time_idx,...][:,a_level_idx,:,:])
        else:
            inputs[var] = (inputs[var].dims, inputs[var].data[a_time_idx,...])
  • Use dask.distributed.Client in threading-only mode (processes=False) to convert the lazy Dask arrays into in-memory arrays with a thread-blocking call to compute, and
  • Return the realized dataset to the main thread for subsequent processing, via a thread-safe queue.

When doing so, I intermittently receive two categories of exceptions from dask. The first is a SystemError:

2024-03-08 13:17:51,404 - distributed.worker - WARNING - Compute Failed
Key:       ('concatenate-open_dataset-geopotential-getitem-1694daf6b6f554dadb58cb1c6a0ab42c', 0, 0, 0, 0)
Function:  execute_task
args:      ((<function getitem at 0x154c850e02c0>, (<function getitem at 0x154c850e02c0>, (<function getitem at 0x154c850e02c0>, (subgraph_callable-ecb9cb81-62ff-42b4-bfdc-571bccb1dbf0, ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x153c5968e140>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (slice(62, 63, None), slice(0, 37, None), slice(0, 721, None), slice(0, 1440, None))), (array([0]), slice(None, None, None), slice(None, None, None), slice(None, None, None))), (array([0]), slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(None, None, None), array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36]), slice(None, None, None), slice(None, None, None))))
kwargs:    {}
Exception: "SystemError('/home/conda/feedstock_root/build_artifacts/python-split_1696328470153/work/Objects/tupleobject.c:927: bad argument to internal function')"

while the second is a KeyNotFound error:

2024-03-08 13:17:51,429 - distributed.worker - ERROR - Exception during execution of task ('broadcast_to-ea76769e6c2e9cb7113e9f1824307da6', 0, 0, 0, 0, 0, 0).
Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 184, in __getitem__
    return self.fast[key]
           ~~~~~~~~~^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/lru.py", line 117, in __getitem__
    result = self.d[key]
             ~~~~~~^^^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2383, in _prepare_args_for_execution
    data[k] = self.data[k]
              ~~~~~~~~~^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/spill.py", line 216, in __getitem__
    return super().__getitem__(key)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 186, in __getitem__
    return self.slow_to_fast(key)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 153, in slow_to_fast
    value = self.slow[key]
            ~~~~~~~~~^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/cache.py", line 67, in __getitem__
    gen = self._last_updated[key]
          ~~~~~~~~~~~~~~~~~~^^^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2235, in execute
    args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2387, in _prepare_args_for_execution
    data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
                         ~~~~~~~~~~~~~~~~~^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

The computation will often succeed on a subsequent attempt if I catch the exception raised by .compute and re-execute the call.

This second error seems to disappear2 if I pass cache=False to open_mfdataset, but the first persists.

Both errors seem to disappear if I use only a single worker thread3 or if I restrict the data being used to only one file's worth (that is, still open all the files from open_mfdataset, but only use entries that I know come from one of those source Zarr trees).

What did you expect to happen?

No response

Minimal Complete Verifiable Example

No response

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

No response

Anything else we need to know?

No response

Environment

INSTALLED VERSIONS
------------------
commit: None
python: 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:53:32) [GCC 12.3.0]
python-bits: 64
OS: Linux
OS-release: 4.18.0-240.el8.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.14.3
libnetcdf: 4.9.2


xarray: 2024.1.0
pandas: 2.2.1
numpy: 1.26.4
scipy: 1.12.0
netCDF4: 1.6.5
pydap: None
h5netcdf: 1.3.0
h5py: 3.10.0
Nio: None
zarr: 2.16.1
cftime: 1.6.3
nc_time_axis: None
iris: 3.7.0
bottleneck: None
dask: 2024.1.0
distributed: 2024.1.0
matplotlib: 3.8.3
cartopy: 0.22.0
seaborn: None
numbagg: None
fsspec: 2023.12.2
cupy: None
pint: None
sparse: 0.15.1
flox: None
numpy_groupies: None
setuptools: 69.1.1
pip: 24.0
conda: None
pytest: None
mypy: None
IPython: 8.20.0
sphinx: None

Footnotes

  1. I was previously using .sel to slice the data in a more semantic way, but I switched to the lower-level slicing to rule out any thread-unsafety inside the non-delayed-computing parts of Xarray when dealing with a shared dataset.

  2. The errors are random in exactly the annoying way that threading errors are. One factor making them more likely is debugging printouts, which seems to disrupt thread scheduling enough to trigger them more often.

  3. Unfortunately, this results in an unacceptably slow performance. Loading the data in parallel really does improve throughput. I haven't yet tried using the asynchronous model for the dask client.

@csubich csubich added bug needs triage Issue that has not been reviewed by xarray team member labels Mar 8, 2024
@max-sixty
Copy link
Collaborator

I suspect this is a lack of thread safety in dask. In particular, the tracebacks are internal dask data structures. Is there anything to suggest this something we could fix from the xarray side?

@max-sixty max-sixty added plan to close May be closeable, needs more eyeballs and removed bug needs triage Issue that has not been reviewed by xarray team member labels Oct 18, 2024
@dcherian
Copy link
Contributor

cc @fjetter @phofl for awareness

@max-sixty max-sixty closed this as not planned Won't fix, can't repro, duplicate, stale Nov 26, 2024
@fjetter
Copy link

fjetter commented Nov 27, 2024

The SystemError looks like a CPython error. I wonder if this goes away with python 3.12. Dask is likely not to blame for this one. It happens when we execute the task, i.e. we're likely running some numpy code underneath.

I'm a little skeptical about the thread safety concern. The part that is being shown in the traceback is running in a single thread and the data that is being accessed here is also only read from and written to from that same thread.
Do you by any chance see a warning like distributed.scheduler - WARNING - Detected different run_specfor key on the scheduler? See also dask/dask#9888

I would also recommend upgrading dask and trying a more recent version. A lot has changed since January

@csubich
Copy link
Author

csubich commented Nov 27, 2024

I've since restructured the code to go through Dask Futures directly submitted to a distributed (albeit threaded, rather than multiprocess) scheduler, rather than have separate execution threads independently call .compute(). That seems to have fixed or at least suppressed the problem. (The resulting logic was also a lot simpler, but it just took a bunch of awkward experimentation to think in event loops.)

The KeyNotFound error was the weird part to me, since it seemed like it should never happen. I suspected some combination of Dask and XArray because it felt like a computation graph was being shared between threads that weren't properly counting references, such that one thread would free a node later relied on by another. However, this is just speculation, since the keys themselves aren't terribly semantically meaningful.

To my recollection, I never saw suggestive warnings like your quoted dask.scheduler one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
plan to close May be closeable, needs more eyeballs
Projects
None yet
Development

No branches or pull requests

4 participants