Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pandas.errors.InvalidIndexError raised when running computation in parallel using dask #7059

Open
4 tasks done
lumbric opened this issue Sep 20, 2022 · 9 comments
Open
4 tasks done
Labels
bug needs triage Issue that has not been reviewed by xarray team member

Comments

@lumbric
Copy link
Contributor

lumbric commented Sep 20, 2022

What happened?

I'm doing a computation using chunks and map_blocks() to run things in parallel. At some point a pandas.errors.InvalidIndexError is raised. When using dask's synchronous scheduler, everything works fine. I think pandas.core.indexes.base.Index is not thread-safe. At least this seems to be the place of the race condition. See further tests below.

(This issue was initially discussed in #6816, but the ticket was closed, because I couldn't reproduce the problem any longer. Now it seems to be reproducible in every run, so it is time for a proper bug report, which is this ticket here.)

What did you expect to happen?

Dask schedulers single-threaded and threads should have the same result.

Minimal Complete Verifiable Example 1

Edit: I've managed to reduce the verifiable example, see example 2 below.

# I wasn't able to reproduce the issue with a smaller code example, so I provide all my code and my test data. This should make it possible to reproduce the issue in less than a minute.

# Requirements:
#   - git
#   - mamba, see https://github.com/mamba-org/mamba

git clone https://github.com/lumbric/reproduce_invalidindexerror.git
cd reproduce_invalidindexerror

mamba env create -f env.yml

# alternatively run the following, will install latest versions from conda-forge:
# conda create -n reproduce_invalidindexerror
# conda activate reproduce_invalidindexerror
# mamba install -c conda-forge python=3.8 matplotlib pytest-cov dask openpyxl pytest pip xarray netcdf4 jupyter pandas scipy flake8 dvc pre-commit pyarrow statsmodels rasterio scikit-learn pytest-watch pdbpp black seaborn

conda activate reproduce_invalidindexerror

dvc repro checks_simulation

Minimal Complete Verifiable Example 2

import numpy as np
import pandas as pd
import xarray as xr

from multiprocessing import Lock
from dask.diagnostics import ProgressBar


# Workaround for xarray#6816: Parallel execution causes often an InvalidIndexError
# https://github.com/pydata/xarray/issues/6816#issuecomment-1243864752
# import dask
# dask.config.set(scheduler="single-threaded")


def generate_netcdf_files():
    fnames = [f"{i:02d}.nc" for i in range(21)]
    for i, fname in enumerate(fnames):
        xr.DataArray(
            np.ones((3879, 48)),
            dims=("locations", "time"),
            coords={
                "time": pd.date_range(f"{2000 + i}-01-01", periods=48, freq="D"),
                "locations": np.arange(3879),
            },
        ).to_netcdf(fname)
    return fnames


def compute(locations, data):
    def resample_annually(data):
        return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")

    def worker(data):
        locations_chunk = locations.sel(locations=data.locations)
        out_raw = data * locations_chunk
        out = resample_annually(out_raw)
        return out

    template = resample_annually(data)

    out = xr.map_blocks(
        lambda data: worker(data).compute().chunk({"time": None}),
        data,
        template=template,
    )

    return out


def main():
    fnames = generate_netcdf_files()

    locations = xr.DataArray(
        np.ones(3879),
        dims="locations",
        coords={"locations": np.arange(3879)},
    )

    data = xr.open_mfdataset(
        fnames,
        combine="by_coords",
        chunks={"locations": 4000, "time": None},
        # suggested as solution in
        # lock=Lock(),
    ).__xarray_dataarray_variable__

    out = compute(locations, data)

    with ProgressBar():
        out = out.compute()


if __name__ == "__main__":
    main()

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.

Relevant log output

This is the traceback of "Minimal Complete Verifiable Example 1".

Traceback (most recent call last):
  File "scripts/calc_p_out_model.py", line 61, in <module>
    main()
  File "scripts/calc_p_out_model.py", line 31, in main
    calc_power(name="p_out_model", compute_func=compute_func)
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 136, in calc_power
    power = power.compute()
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 993, in compute
    return new.load(**kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 967, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 733, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/parallel.py", line 285, in _wrapper
    result = func(*converted_args, **kwargs)
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 100, in <lambda>
    lambda wind_speeds: worker(wind_speeds).compute().chunk({"time": None}),
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 50, in worker
    specific_power_chunk = specific_power.sel(turbines=wind_speeds.turbines)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 1420, in sel
    ds = self._to_temp_dataset().sel(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 2533, in sel
    query_results = map_index_queries(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexing.py", line 183, in map_index_queries
    results.append(index.sel(labels, **options))  # type: ignore[call-arg]
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 418, in sel
    indexer = get_indexer_nd(self.index, label_array, method, tolerance)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 212, in get_indexer_nd
    flat_indexer = index.get_indexer(flat_labels, method=method, tolerance=tolerance)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3729, in get_indexer
    raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects

Anything else we need to know?

Workaround: Use synchronous dask scheduler

The issue does not occur if I use the synchronous dask scheduler by adding at the very beginning of my script:

dask.config.set(scheduler='single-threaded')

Additional debugging print

If I add the following debugging print to the pandas code:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,6 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            print("Original: ", len(self), ", length of set:", len(set(self)))
             raise InvalidIndexError(self._requires_unique_msg)
 
         if len(target) == 0

...I get the following output:

Original:  3879 , length of set: 3879

So the index seems to be unique, but self.is_unique is False for some reason (note that not self._index_as_unique and self.is_unique is the same in this case).

Proof of race condtion: addd sleep 1s

To confirm that the race condition is at this point we wait for 1s and then check again for uniqueness:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,10 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            if not self.is_unique:
+                import time
+                time.sleep(1)
+                print("now unique?", self.is_unique)
             raise InvalidIndexError(self._requires_unique_msg)

This outputs:

now unique? True

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0] python-bits: 64 OS: Linux OS-release: 5.4.0-125-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8 libhdf5: 1.10.4 libnetcdf: 4.7.3

xarray: 0.15.0
pandas: 0.25.3
numpy: 1.17.4
scipy: 1.3.3
netCDF4: 1.5.3
pydap: None
h5netcdf: 0.7.1
h5py: 2.10.0
Nio: None
zarr: 2.4.0+ds
cftime: 1.1.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: 1.1.3
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.8.1+dfsg
distributed: None
matplotlib: 3.1.2
cartopy: None
seaborn: 0.10.0
numbagg: None
setuptools: 45.2.0
pip3: None
conda: None
pytest: 4.6.9
IPython: 7.13.0
sphinx: 1.8.5

@lumbric lumbric added bug needs triage Issue that has not been reviewed by xarray team member labels Sep 20, 2022
@benbovy
Copy link
Member

benbovy commented Sep 20, 2022

xref #6904 (not sure it is 100% relevant, but the problem also arises when .sel is called within some parallel workflow).

@lumbric
Copy link
Contributor Author

lumbric commented Sep 20, 2022

@benbovy thanks for the hint! I tried passing an explicit lock to xr.open_mfdataset() as suggested, but didn't change anything, still the same exception. I will double check, if I did it the right way, I might be missing something.

@lumbric
Copy link
Contributor Author

lumbric commented Sep 22, 2022

I have managed to reduce the reproducing example (see "Minimal Complete Verifiable Example 2" above) and then also find a proper solution to fix this issue. I am still not sure whether this is a bug or intended behavior, so I'll won't close the issue for now.

Basically the issue occurs when a chunked NetCDF file is loaded from disk, passed to xarray.map_blocks() and is then used in .sel() as parameter to get a subset of some other xarray object which is not passed to the worker func(). I think the proper solution is to use the args parameter of map_blocks() instead of .sel():

--- run-broken.py	2022-09-22 13:00:41.095555961 +0200
+++ run.py	2022-09-22 13:01:14.452696511 +0200
@@ -30,17 +30,17 @@
     def resample_annually(data):
         return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")
 
-    def worker(data):
-        locations_chunk = locations.sel(locations=data.locations)
-        out_raw = data * locations_chunk
+    def worker(data, locations):
+        out_raw = data * locations
         out = resample_annually(out_raw)
         return out
 
     template = resample_annually(data)
 
     out = xr.map_blocks(
-        lambda data: worker(data).compute().chunk({"time": None}),
+        lambda data, locations: worker(data, locations).compute().chunk({"time": None}),
         data,
+        (locations,),
         template=template,
     )

This seems to fix this issue and seems to be the proper solution anyway.

I still don't see why I am not allowed to use .sel() on shadowed objects in the worker func()´. Is this on purpose? If yes, should we add something to the documentation? Is this a specific behavior of map_blocks()`? Is it related to #6904?

@jessjaco
Copy link

jessjaco commented Oct 4, 2022

I've had the same issues under the exact same conditions. However, it happens whether I use dask or not.

This solution fixes it, but I agree at least a doc update would be helpful!

@dcherian
Copy link
Contributor

dcherian commented Oct 4, 2022

I agree with just passing all args explicitly. Does it work otherwise with "processes"?

As a side note, this particular example seems quite convoluted.

  1. Why are you chunking iniside the mapped function?
  2. If you conda install flox, the resample operation should be quite efficient, without the need to use map_blocks

@lumbric
Copy link
Contributor Author

lumbric commented Oct 5, 2022

I agree with just passing all args explicitly. Does it work otherwise with "processes"?

What do you mean by that?

  1. Why are you chunking iniside the mapped function?

Uhm yes, you are right, this should be removed, not sure how this happened. Removing .chunk({"time": None}) in the lambda function does not change the behavior of the example regarding this issue.

  1. If you conda install flox, the resample operation should be quite efficient, without the need to use map_blocks

Oh wow, thanks! Haven't seen flox before.

@itcarroll
Copy link
Contributor

Thanks for raising. I too am intermittently hitting this pandas.errors.InvalidIndexError using xr.Dataset.map_blocks and compute with the default (threads) scheduler. The error does not occur with either "synchronous" or "processes" scheduling. It also does not occur when I throw a call to print in the function handed to map_blocks (so ... race condition?).

Using scheduler="processes" as a solution for now, and will try to come back with another MCVE when time allows.

@lumbric
Copy link
Contributor Author

lumbric commented Mar 2, 2024

@itcarroll did you see my comment above how I fixed my issue? Are you using shadowed variables in your worker function?

@max-sixty
Copy link
Collaborator

My guess is that this is just because of:

I think pandas.core.indexes.base.Index is not thread-safe. At least this seems to be the place of the race condition. See further tests below.

...and dask is just one way of running multi-threading.

If that's the case, we should be able to repro with a much smaller example, just running .sel from a multi-threaded pool...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needs triage Issue that has not been reviewed by xarray team member
Projects
None yet
Development

No branches or pull requests

6 participants