-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sel
behaving randomly when applying to a dataset with multiprocessing
#6904
Comments
That sounds quite unfriendly! A couple of questions to reduce the size of the example, without providing any answers yet unfortunately:
|
Yep, and yep (believe me, I've tried anything in desperation 😄)
Which method should I use then? I need the closest point
Yep I can try to make a minimal example, however, in order to reproduce the issue, I think it's necessary to open a large dataset. |
Re nearest, does it replicate with exact lookups? |
I haven't tried yet because it doesn't really match my use case. another thing that I've noticed is that if the list of iterators is smaller than the chunksize everything's good, probably because it reverts to the serial case as only 1 worker is processing |
Can you try explicitly passing in a multiprocessing lock into the from multiprocessing import Lock
ds = xarray.open_dataset(file, lock=Lock()) (We automatically select appropriate locks if using Dask, but I'm not sure how we would do that more generally...) |
ok that's a good shot. |
Yes it should, as long as you're using multi-processing under the covers. If you do multi-threading, then you would want to use |
mmm ok I'll try and let you know. BTW is there any advantage or difference in terms of cpu and memory consumption in opening the file only one or let it open by every process? I'm asking because I thought opening in every process was just plain stupid but it seems to perform exactly the same, so maybe I'm just creating a problem where there is none |
That causes an error Error 11: Resource temporarily unavailable Here is the complete tracebabk concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 198, in _process_chunk
return [fn(*args) for args in chunk]
File "/var/models/miniconda3/lib/python3.8/concurrent/futures/process.py", line 198, in <listcomp>
return [fn(*args) for args in chunk]
File "test_sel_bug.py", line 58, in compute_clima
return station, ds_point.t_2m_med.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(), ds_point.lat.min().item()
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/common.py", line 58, in wrapped_func
return self.reduce(func, dim, axis, skipna=skipna, **kwargs)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/dataarray.py", line 2696, in reduce
var = self.variable.reduce(func, dim, axis, keep_attrs, keepdims, **kwargs)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 1806, in reduce
data = func(self.data, **kwargs)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 339, in data
return self.values
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 512, in values
return _as_array_or_item(self._data)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/variable.py", line 252, in _as_array_or_item
data = np.asarray(data)
File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
return array(a, dtype, copy=False, order=order)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 552, in __array__
self._ensure_cached()
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 549, in _ensure_cached
self.array = NumpyIndexingAdapter(np.asarray(self.array))
File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
return array(a, dtype, copy=False, order=order)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 522, in __array__
return np.asarray(self.array, dtype=dtype)
File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
return array(a, dtype, copy=False, order=order)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 423, in __array__
return np.asarray(array[self.key], dtype=None)
File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
return array(a, dtype, copy=False, order=order)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/coding/variables.py", line 70, in __array__
return self.func(self.array)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/coding/variables.py", line 137, in _apply_mask
data = np.asarray(data, dtype=dtype)
File "/var/models/miniconda3/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
return array(a, dtype, copy=False, order=order)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 423, in __array__
return np.asarray(array[self.key], dtype=None)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/backends/netCDF4_.py", line 93, in __getitem__
return indexing.explicit_indexing_adapter(
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/core/indexing.py", line 712, in explicit_indexing_adapter
result = raw_indexing_method(raw_key.tuple)
File "/var/models/miniconda3/lib/python3.8/site-packages/xarray/backends/netCDF4_.py", line 106, in _getitem
array = getitem(original_array, key)
File "src/netCDF4/_netCDF4.pyx", line 4420, in netCDF4._netCDF4.Variable.__getitem__
File "src/netCDF4/_netCDF4.pyx", line 5363, in netCDF4._netCDF4.Variable._get
File "src/netCDF4/_netCDF4.pyx", line 1950, in netCDF4._netCDF4._ensure_nc_success
RuntimeError: Resource temporarily unavailable
""" I think we may be heading the right direction |
Ok, it seems to fail also with exact lookups o.O I'm using def compute():
ds_point = ds.isel(lat=0, lon=0)
return ds_point.t_2m_med.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(),
ds_point.lat.min().item() Result for the serial version [(
10.469047546386719,
6.5044121742248535,
6.0,
48.0),
(
10.469047546386719,
6.5044121742248535,
6.0,
48.0),
(
10.469047546386719,
6.5044121742248535,
6.0,
48.0),
(
10.469047546386719,
6.5044121742248535,
6.0,
48.0),
(
10.469047546386719,
6.5044121742248535,
6.0,
48.0)] As you would expect all values are the same. And for the parallel version with EXACTLY the same code [(
7.968084812164307,
6.948009967803955,
6.0,
48.0),
(
7.825599193572998,
6.995675563812256,
6.0,
48.0),
(
8.894186019897461,
6.849221706390381,
6.0,
48.0),
(
8.901763916015625,
6.69615364074707,
6.0,
48.0),
(
9.164983749389648,
6.484694480895996,
6.0,
48.0)] |
This is a minimal working example that I could come up with. You can try to open any netcdf that you have. import xarray as xr
from tqdm.contrib.concurrent import process_map
import pprint
def main():
global ds
ds = xr.open_dataset('input.nc')
it = range(0, 5)
results = []
for i in it:
results.append(compute(i))
print("------------Serial results-----------------")
pprint.pprint(results)
results = process_map(compute, it, max_workers=6, chunksize=1, disable=True)
print("------------Parallel results-----------------")
pprint.pprint(results)
def compute(station):
ds_point = ds.isel(lat=0, lon=0)
return station, ds_point.t_2m_max.mean().item(), ds_point.t_2m_min.mean().item(), ds_point.lon.min().item(), ds_point.lat.min().item()
if __name__ == "__main__":
main() |
You might look into different multiprocessing modes: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods It may also be that the NetCDF or HDF5 libraries were simply not written in a way that can support multi-processing. This would not surprise me.
I agree, maybe this isn't worth the trouble. I have not seen it done successfully before. |
While it's somewhat unsatisfactory, marking as planning to close given we can't reliably reproduce, and so not sure how to make progress |
What happened?
I have a script structured like this
This works as expected. However, if I try to parallelize
compute
by calling it withThe results of the print are completely different from the serial example and they change every time that I run the script. it seems that the
sel
is giving back a different part of the dataset when there are multiple processes running in parallel.If I move the
open_dataset
statement insidecompute
then everything works also in the parallel case in the same way as in the serial one. Also, if I load the dataset at the beginning, i.e.ds = xr.open_dataset(file).load()
, I also have reproducible results.Is this supposed to happen? I really don't understand how.
What did you expect to happen?
The behaviour of
sel
should be the same in parallel or serial execution.Minimal Complete Verifiable Example
No response
MVCE confirmation
Relevant log output
No response
Anything else we need to know?
No response
Environment
INSTALLED VERSIONS
commit: None
python: 3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10)
[GCC 10.3.0]
python-bits: 64
OS: Linux
OS-release: 3.10.0-229.1.2.el7.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.utf8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4
xarray: 2022.3.0
pandas: 1.2.3
numpy: 1.20.3
scipy: 1.8.1
netCDF4: 1.5.6
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: None
cftime: 1.6.1
nc_time_axis: None
PseudoNetCDF: None
rasterio: 1.2.1
cfgrib: None
iris: None
bottleneck: None
dask: 2022.7.1
distributed: 2022.7.1
matplotlib: 3.5.2
cartopy: 0.18.0
seaborn: 0.11.2
numbagg: None
fsspec: 2022.5.0
cupy: None
pint: 0.19.2
sparse: None
setuptools: 59.8.0
pip: 22.2
conda: 4.13.0
pytest: None
IPython: 8.4.0
sphinx: None
The text was updated successfully, but these errors were encountered: