-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reuse of keys in blockwise fusion can cause spurious KeyErrors on distributed cluster #9888
Comments
If I understood correctly:
I don't think this should be in dask/dask. This is (yet another) fragility of the cancelled state and the dask/dask graph shouldn't work around it. It should be OK to submit to the scheduler a new, different task with the same key as a released future. |
Actually - I don't think you need to change the graph. You can get this error if you resubmit the same exact tasks.
This requires x to go through its whole lifecycle between client, scheduler, and worker b before a has had the chance of doing even a single round of the event loop. The easiest way to achieve this is to hamstring a's event loop by spilling many GiBs worth of data. |
TLDR The current system assumes strongly, in many places, that a key uniquely identifies a task, not just the output. Reusing a key for a different task violates this and distributed can fail on multiple levels. The scheduler doesn't have the possibility to understand that the tasks are new and different. The problem has nothing to do with the Worker, this error is just surfacing there. As stated above, if validation is on, this raises in various places on the scheduler (I found three different assertion that could be triggered depending on queuing/no-queuing and some timing; here, here and I forgot where the third one was). If you inspect closely what happens if you are executing the above script (and the del is not necessary, the del is merely there for being explicit) is that the second What happens here is that the scheduler knows a certain task A already that doesn't have any dependencies (first persist, w/ task fusion). Then a subsequent It may be true that the worker could handle this better but the scheduler is also corrupted already.
I believe this is where the first fallacy is hidden. In the code above, what actually happens is
i.e. the scheduler doesn't even know that the futures were released. This timing is unavoidable due to how refcounting works. Even if you disregard this release-chain, the same issue can very very easily be constructed with a second client and different graphs. |
@crusaderky I can't truly follow your example here but it is a different issue. I'm talking about a guarantee that the task has to be uniquely identifiable by the key which is violated here which causes all sorts of failures in distributed. If you found something else, I would like you to open a new ticket, please. I believe this issue is cause by this line reusing Line 1589 in a8327a3
@rjzamora @jrbourbeau Can either one of you motivate why blockwise fusion layers cannot generate a new key? @jrbourbeau you mentioned this recently as a UX problem that could be solved by a "better version of HLGs". I feel like I'm missing some context |
Agree that this is a significant problem with HLGs. The reason is the simple fact that there is no (formal/robust) mechanism to "replay" the construction of a graph after the names/properties for a subset of its tasks has been changed. That is, you can't really change the name of a |
So this is a limitation because there might've been an "accidental" materialization before the fusing? |
Yes and no - The limitation is that there is no API to replace a layer in the HLG (e.g. |
xref #8635. Like @rjzamora said:
|
I'm still trying to wrap my head around it but I'm getting there. IIUC this is mostly about layer dependencies? Best case, layer dependencies is simply exchanging a single key; Worst case would be if the layer dependent of that Blockwise layer is a MaterializedLayer in which case we'd need to walk the entire graph to rename the key. is this roughly correct? |
Yes. The problem is that you would need to regenerate all dependent layers, and there is no clear way to do this. Why there is no clear way to do this: There is no formal mechanism to change the dependencies for an existing Layer. The required Layer API is very limited, and does not provide a way to set the dependencies. Therefore, different logic would need to be used to change dependencies for different layers. For the Given that the logic here would be "ugly" with the existing Layer API, the ideal solution is probably to update the Layer API to include a clear mechanism for tracking dependencies. However, we have been waiting to clean up serialization. I have personally been waiting to remove the |
Thanks, that clears things up for me. |
Found a nice textbook example in the distributed tests https://github.com/dask/distributed/pull/8185/files#r1479864806 |
(repost from dask/distributed#8185 (comment)) ddf = dd.from_pandas(df, npartitions=4)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf = ddf.set_index("a").sort_values("b")
result = ddf.compute() This code:
Some keys are the same, but run_spec changes; this occasionally triggers a race condition where the scheduler didn't have the time to forget those keys yet. This is (fortuitously) not causing failures because the key is always close to its released->forgotten transition when this happens (although in other cases it may be in memory; I'm not sure). If I add a line
|
They are currently triggering dask warnings related to a bug dask/dask#9888 . Retry including them after it's fixed.
commit 4e0a9e95301ed6497703b3fb61711abfd4bdfb7a Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Tue Jun 4 15:08:18 2024 +0200 fix(l3): fix dask KeyError This commit fixes the dask issues, previously related to memory shortage. The issues were actually due to non-unique keys for dask tasks (or results or something), see dask/dask#9888 for details. Using closures instead of preliminary graph computation (`.persist()`) to fix the values of `i` in the for-loops, fixes the issue. commit fc5431d440473e3a1b89bf61ff10404d8273f2bf Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Tue Jun 4 11:59:41 2024 +0200 fix(l1b+l2): fix CRS logic Repair logic to choose a projection CRS. Now, x and y in l1b_data always refer to the CRS of the reference DEM. For l2 data, a CRS can be chosen. Fixes #22 commit 9a8d1b22832c67245953f6e977f0d19b383c6952 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Tue Jun 4 11:45:45 2024 +0200 chore(l1b): make `drop_non_glacier_area` more robust commit c2dde15fc038c84d645b5498dac482e28aaa7343 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Mon Jun 3 18:16:22 2024 +0200 refactor(l2+l3): revise cache backup strategy commit 7dd521ed428a9ca5bc2132188289c410fd31b7ae Merge: f486345 664aaec Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Mon Jun 3 11:37:25 2024 +0200 Merge branch 'main' into debug_l3 commit f4863457d781eaf82aa21498189c282e9857db23 Merge: e042679 a5377b8 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Sun Jun 2 21:31:15 2024 +0200 Merge branch 'main' into debug_l3 commit e04267976566db303b28d2852ca35609d0d7176d Merge: 92762a8 6daef87 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Sun Jun 2 16:37:50 2024 +0200 Merge branch 'main' into debug_l3 commit 92762a836237de1363f0bb75180ffdd34f6e3fb6 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Thu May 30 12:12:49 2024 +0200 Revert "feat(misc): improve multiprocessing in request_workers" This reverts commit c362e88. commit bc3d669 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Wed May 29 17:53:24 2024 +0200 WIP debugging strange dask errors in l3 commit 3240554 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Wed May 29 14:25:48 2024 +0200 On (no branch): migration stash commit c362e88 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Wed May 29 12:11:34 2024 +0200 feat(misc): improve multiprocessing in request_workers commit 483e9bf Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Fri May 24 15:13:05 2024 +0200 chore: add xarray v2024.1.1 requirement commit ab5cbb9 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Wed Apr 3 19:06:32 2024 +0200 fix(l2): fix hardcoded path commit 4d01038 Author: jan <152862650+j-haacker@users.noreply.github.com> Date: Wed Apr 3 18:53:22 2024 +0200 fix(l3): WIP fix issues
Hi, If there is no workaround/alternative, I have the time to contribute to this issue (it would be my first in Dask, I have only contributed to Xarray). import xarray as xr
import dask.array as da
from dask.distributed import LocalCluster, Client
# Change the base_path
base_path = "YOUR_PATH"
sizes = [("a", 3), ("b", 5), ("c", 2)]
def update_dataset(factor):
coords = {
dim: list(range(size))
for dim, size in sizes
}
dataset = xr.Dataset(
{
"data": xr.DataArray(
da.ones(
shape=[size for _, size in sizes],
chunks=(1, 1, 1)
) * factor,
coords=coords,
dims=[dim for dim, _ in sizes]
)
}
)
path = f"{base_path}/test_zarr_{factor}"
dataset.to_zarr(path, mode="w", compute=True)
return xr.open_zarr(path)
def calculate1(x, y):
path = f"{base_path}/test_zarr_calculate1"
result = x * y
result.to_zarr(path, mode="w", compute=True)
return result
def calculate2(x, y):
path = f"{base_path}/test_zarr_calculate2"
result = x / y
result.to_zarr(path, mode="w", compute=True)
return result
dag = {
"update_arr1": (update_dataset, 2),
"update_arr2": (update_dataset, 4),
"calculate1": (calculate1, "update_arr1", "update_arr2"),
"calculate2": (calculate2, "update_arr1", "update_arr2"),
"report": (lambda x, y: True, "calculate1", "calculate2")
}
with LocalCluster(
n_workers=1,
threads_per_worker=2,
memory_limit="1GB",
) as cluster:
with cluster.get_client() as client:
print(client.get(dag, "report")) Warning raised: 2024-08-24 16:13:02,764 - distributed.scheduler - WARNING - Detected different `run_spec` for key 'original-open_dataset-data-c801781151f576ac3a1c261dc894c1d0' between two consecutive calls to `update_graph`. This can cause failures and deadlocks down the line. Please ensure unique key names. If you are using a standard dask collections, consider releasing all the data before resubmitting another computation. More details and help can be found at https://github.com/dask/dask/issues/9888.
Debugging information
---------------------
old task state: processing
old run_spec: (<function execute_task at 0x0000023975DFF600>, (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x00000239772DD380>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))))),), {})
new run_spec: (<function execute_task at 0x0000023975DFF600>, (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x0000023977322A40>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None)))))),), {})
old token: ('tuple', [('913ceb5b5beb463a9010ec0790bc30002ca34164', []), ('tuple', [('7a2b6b38794817e675b96ba80026355719825fb1', ['6693512388b528fbf156d0cc4c7b9449588c44bf'])]), ('dict', [])])
new token: ('tuple', [('913ceb5b5beb463a9010ec0790bc30002ca34164', []), ('tuple', [('7a2b6b38794817e675b96ba80026355719825fb1', ['45ef8ea51c5d50300dd28843ec098db9cbb85566'])]), ('dict', [])])
old dependencies: set()
new dependencies: set() I'm using Xarray 2024.07.0 and Dask 2024.08.01 |
@josephnowak this looks like a bug that's been reported on the xarray tracker pydata/xarray#9325 which was fixed in #11320 |
@fjetter Thanks for your response, I tried using Dask 2024.08.01 which in theory contains the fix that you are mentioning but |
Hi all, I'm running dask.array.linalg.svd_compressed on a dask array, and getting an error linking to this issue, with
and
and basically the same warning tens of times with different keys (sometimes it's "assign-...", sometimes it's "unique-...", etc) I am wondering whether that's because of a bug somewhere, or because I'm doing something kind of weird to build my dask array from a dask dataframe initially? Context is recommender systems, and the typical user-items matrix decomposition that goes with it. I've got a dask dataframe with user-item interaction rows, (essentially equivalent to a sparse coo matrix I believe), and when trying to turn it into a dask array (so I can use dask.array.linalg.svd_compressed on it), I had to normalise the user ids to be between 0 and n_users, but also for each chunk of the dask array I had to start from 0 again, and do the bookkeeping myself: def partition_to_coo(partition):
users_from_this_partition = (partition["encoded_userid"] % division_size).values
items_from_this_partition = partition["encoded_itemid"].values
values = partition["value"].values
np_array = sparse.COO(
(values, (users_from_this_partition, items_from_this_partition)),
shape=(max(users_from_this_partition) + 1, n_items),
).todense()
return np_array
dense_user_items_matrix = views_sorted.map_partitions(
partition_to_coo, meta=np.ndarray(shape=(0, 0), dtype=int)
)
Thanks! |
Subsequent Blockwise layers are currently fused into a single layer. This reduces the number of tasks, the overhead and is very generally a good thing to do. Currently, the fused output does not generate unique key names which is a problem from a UX perspective but can also cause severe failure cases when being executed on the distributed scheduler since distributed assumes that a task key is a unique identifier for the entire task. While it is true that the data output of the fused key and the non-fused key is identical, the runspec and the local topology is intentionally very different. Specifically, a fused task, for example, may not have any dependencies while the non-fused task does have dependencies.
An example where this matters is the following (async code is not necessary but the condition is actually a bit difficult to trigger and this helps. Paste this code in a Jupyter notebook and run it a couple of times).
Note how initial shuffle is persisted and a slightly different version of this graph is computed again below but the graph below is slightly different. From what I can gather, the initial persist is fusing the keys while the latter one does not (maybe it's the other way round, I'm not sure. Either way a different issue).
This specific reproducer actually triggers (not every time) a
KeyError
in a workers data buffer while trying to read data.This is caused because the dependency relations between tasks are no longer accurate on the scheduler and it considers a task "ready", i.e. all dependencies in memory, too soon, causing a failure on the worker.
When
validate
is activated, the scheduler catches these cases earlier and raises appropriate AssertionErrors. This is not checked at runtime for performance reasons and is typically not necessary since we rely on the assumption that keys identify a task uniquely.Apart from this artificial example, we do have internal reports about such a spurious KeyError in combination with an xgboost workload
The text was updated successfully, but these errors were encountered: