Skip to content

Conversation

@johanneskoester
Copy link
Contributor

@johanneskoester johanneskoester commented Sep 3, 2025

snakemake --default-resources executor="'slurm' if resources.mem_mb > 10000 else 'kubernetes'"

Will be complemented by a mechanism to specify per-rule default executors and an automated approach for transferring files between different storage backends/plugins.
Introduces a breaking change in the API, but CLI and workflows remain compatible.

QC

  • The PR contains a test case for the changes or the changes are already covered by an existing test case.
  • The documentation (docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 3, 2025

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

The change shifts executor configuration from a single settings object to a mapping keyed by executor name. Workflow and API signatures are updated accordingly, executor plugin validation is removed here, and job scheduling now resolves and caches executors per job via a plugin registry using the job’s executor resource.

Changes

Cohort / File(s) Summary
API: executor settings mapping
src/snakemake/api.py
DAGApi.execute_workflow now accepts Optional[Mapping[str, ExecutorSettingsBase]] instead of a single settings object; defaults to empty dict; sets default_resources["executor"]; stops passing executor_plugin and passes executor_settings; removes direct settings validation; docstrings updated.
Workflow: settings model and execute signature
src/snakemake/workflow.py
Workflow.executor_settings changed to Mapping[str, ExecutorSettingsBase] with immutables.Map() default; execute(...) signature updated to take executor_settings mapping, remove executor_plugin param, and add updated_files (optional); aligns internal data flow with per-executor configuration.
Scheduler: per-job plugin-based executor resolution
src/snakemake/scheduling/job_scheduler.py
Introduces ExecutorPluginRegistry usage and executor cache; adds typed get_executor(job) -> AbstractExecutor that reads job.resources["executor"], validates, constructs/caches executor with settings; replaces prior local/global selection; updates success handling to use per-job executor.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant DagApi
  participant Workflow
  participant Scheduler
  participant ExecRegistry as ExecutorPluginRegistry
  participant Executor as Executor (per-name)

  User->>DagApi: execute_workflow(..., executor_settings: {name->settings}, ...)
  DagApi->>Workflow: execute(executor_settings=map, ...)
  Workflow->>Scheduler: start scheduling jobs
  loop For each job
    Scheduler->>Scheduler: job.resources["executor"] = name
    Scheduler->>ExecRegistry: get(name) / validate(settings[name])
    ExecRegistry-->>Scheduler: executor factory
    Scheduler->>Executor: instantiate(or reuse) with settings[name]
    Scheduler->>Executor: submit(job)
    Executor-->>Scheduler: job result/status
    Scheduler->>Executor: handle_job_success(job)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–75 minutes

Possibly related PRs

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/multi-executor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@johanneskoester johanneskoester added the WIP Work in progress. Not meant for merging. label Sep 3, 2025
@johanneskoester johanneskoester marked this pull request as draft September 3, 2025 16:10
@github-actions
Copy link
Contributor

github-actions bot commented Sep 3, 2025

Please format your code with black: black snakemake tests/*.py.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/snakemake/workflow.py (2)

1202-1209: Broken execute() signature: executor_plugin is still used but no longer passed

execute() references executor_plugin in multiple places (e.g., Lines 1227, 1313, 1321) causing NameError and breaking dryrun/local_exec logic. Restore the parameter and pass it from API.

Apply this diff to restore the parameter:

 def execute(
     self,
-    executor_settings: Mapping[str, ExecutorSettingsBase],
+    executor_plugin: ExecutorPlugin,
+    executor_settings: Mapping[str, ExecutorSettingsBase],
     scheduler_plugin: SchedulerPlugin,
     scheduler_settings: Optional[SchedulerSettingsBase],
     greedy_scheduler_settings: GreedySchedulerSettings,
     updated_files: Optional[List[str]] = None,
 ):

And ensure API calls pass executor_plugin (see api.py comment).


1277-1281: updated_files collects IOFile objects but is typed as List[str]

Return strings to match the public API expectation.

Apply this diff:

-            if updated_files is not None:
-                updated_files.extend(
-                    f for job in self.dag.needrun_jobs() for f in job.output
-                )
+            if updated_files is not None:
+                updated_files.extend(
+                    str(f) for job in self.dag.needrun_jobs() for f in job.output
+                )
src/snakemake/api.py (1)

644-651: Pass executor_plugin into workflow.execute after signature fix

workflow.execute still needs the main executor plugin for global behavior and logging.

Apply this diff:

-        workflow.execute(
-            executor_settings=executor_settings,
+        workflow.execute(
+            executor_plugin=executor_plugin,
+            executor_settings=executor_settings,
             scheduler_plugin=scheduler_plugin,
             scheduler_settings=scheduler_settings,
             greedy_scheduler_settings=greedy_scheduler_settings,
             updated_files=updated_files,
         )
🧹 Nitpick comments (2)
src/snakemake/scheduling/job_scheduler.py (1)

131-133: Executor cache and local registry are fine

Per-executor cache (_executors) and a registry instance are appropriate. You can reuse the module-level singleton, but not required.

src/snakemake/api.py (1)

486-488: Default executor_settings mapping

Initializing to {} is fine; no change needed. If you want immutability parity with Workflow default, consider immutables.Map(), but optional.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fd0a711 and 3214951.

📒 Files selected for processing (3)
  • src/snakemake/api.py (4 hunks)
  • src/snakemake/scheduling/job_scheduler.py (3 hunks)
  • src/snakemake/workflow.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/workflow.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/api.py
🧠 Learnings (1)
📚 Learning: 2024-10-13T14:10:37.796Z
Learnt from: johanneskoester
PR: snakemake/snakemake#3114
File: snakemake/cli.py:708-708
Timestamp: 2024-10-13T14:10:37.796Z
Learning: In the `snakemake/cli.py` file, `ExecutorPluginRegistry()` is a singleton class, so multiple instantiations are acceptable and do not cause unnecessary overhead.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
🧬 Code graph analysis (2)
src/snakemake/scheduling/job_scheduler.py (1)
src/snakemake/jobs.py (3)
  • AbstractJob (85-119)
  • resources (489-510)
  • resources (1512-1526)
src/snakemake/api.py (1)
src/snakemake/resources.py (1)
  • set_resource (69-71)
🪛 Ruff (0.12.2)
src/snakemake/scheduling/job_scheduler.py

176-176: Use of assert detected

(S101)

src/snakemake/api.py

591-591: Use of assert detected

(S101)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (34)
  • GitHub Check: tests (9, windows-latest, py312)
  • GitHub Check: tests (10, windows-latest, py312)
  • GitHub Check: tests (6, macos-latest, py312)
  • GitHub Check: tests (10, windows-latest, py313)
  • GitHub Check: tests (9, macos-latest, py313)
  • GitHub Check: tests (9, windows-latest, py313)
  • GitHub Check: tests (8, windows-latest, py312)
  • GitHub Check: tests (8, windows-latest, py313)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (7, windows-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (7, windows-latest, py312)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (6, windows-latest, py313)
  • GitHub Check: tests (5, windows-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (5, windows-latest, py312)
  • GitHub Check: tests (6, windows-latest, py312)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (1, macos-latest, py313)
  • GitHub Check: tests (4, windows-latest, py313)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (2, windows-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (3, windows-latest, py312)
  • GitHub Check: tests (4, windows-latest, py312)
  • GitHub Check: tests (2, windows-latest, py313)
  • GitHub Check: tests (3, windows-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (1, windows-latest, py313)
  • GitHub Check: tests (1, windows-latest, py312)
🔇 Additional comments (4)
src/snakemake/workflow.py (2)

22-23: LGTM: importing immutables for persistent mappings

Import aligns with new executor_settings default. No issues.


161-161: Executor settings as an immutable mapping is fine

Defaulting to immutables.Map() is safe and avoids accidental mutation.

src/snakemake/scheduling/job_scheduler.py (1)

22-22: LGTM: type-hint import for AbstractExecutor

Import is correct and scoped.

src/snakemake/api.py (1)

461-476: Docs updated for mapping-based executor settings

Looks good and consistent with the PR intent.

Comment on lines 591 to 593
assert self.workflow_api.resource_settings.default_resources is not None
self.workflow_api.resource_settings.default_resources.set_resource("executor", executor)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure default_resources exists before setting 'executor'

assert can be stripped in optimized runs. Create defaults if absent to make local execution path robust.

Apply this diff:

-        assert self.workflow_api.resource_settings.default_resources is not None
-        self.workflow_api.resource_settings.default_resources.set_resource("executor", executor)
+        if self.workflow_api.resource_settings.default_resources is None:
+            self.workflow_api.resource_settings.default_resources = DefaultResources()
+        self.workflow_api.resource_settings.default_resources.set_resource(
+            "executor", executor
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert self.workflow_api.resource_settings.default_resources is not None
self.workflow_api.resource_settings.default_resources.set_resource("executor", executor)
if self.workflow_api.resource_settings.default_resources is None:
self.workflow_api.resource_settings.default_resources = DefaultResources()
self.workflow_api.resource_settings.default_resources.set_resource(
"executor", executor
)
🧰 Tools
🪛 Ruff (0.12.2)

591-591: Use of assert detected

(S101)

🤖 Prompt for AI Agents
In src/snakemake/api.py around lines 591-593, replace the bare assert with a
runtime check that ensures self.workflow_api.resource_settings.default_resources
exists: if it is None, instantiate the module's standard default-resources
object (e.g., create a new DefaultResources / ResourceDefaults instance or call
the factory used elsewhere in the codebase) and assign it to
self.workflow_api.resource_settings.default_resources, then call
set_resource("executor", executor) as before; this guarantees a valid object in
optimized runs where asserts are stripped.

Comment on lines +174 to +187
def get_executor(self, job: AbstractJob) -> AbstractExecutor:
executor_name = job.resources["executor"]
assert isinstance(executor_name, str)
if executor_name not in self._executors:
plugin = self._executor_plugin_registry.get_plugin(executor_name)
settings = self.workflow.executor_plugin_settings.get(executor_name)
plugin.validate_settings(settings)
self._executors[executor_name] = plugin.executor(
self.workflow,
logger,
settings,
)
return self._executors[executor_name]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Fix executor settings lookup and avoid assert; add robust validation

  • Uses nonexistent attribute workflow.executor_plugin_settings (should be workflow.executor_settings).
  • assert on a runtime condition (S101) can be stripped; raise a WorkflowError instead.
  • Guard validate_settings(None) to avoid plugins erroring on None.

Apply this diff:

-    def get_executor(self, job: AbstractJob) -> AbstractExecutor:
-        executor_name = job.resources["executor"]
-        assert isinstance(executor_name, str)
-        if executor_name not in self._executors:
-            plugin = self._executor_plugin_registry.get_plugin(executor_name)
-            settings = self.workflow.executor_plugin_settings.get(executor_name)
-            plugin.validate_settings(settings)
-            self._executors[executor_name] = plugin.executor(
-                self.workflow,
-                logger,
-                settings,
-            )
-        return self._executors[executor_name]
+    def get_executor(self, job: AbstractJob) -> AbstractExecutor:
+        try:
+            executor_name = job.resources["executor"]
+        except KeyError:
+            raise WorkflowError(
+                f"Job {job} does not define an 'executor' resource; unable to resolve executor."
+            )
+        if not isinstance(executor_name, str):
+            raise WorkflowError(
+                f"Executor resource for job {job} must be a string, got {type(executor_name).__name__}."
+            )
+        if executor_name not in self._executors:
+            plugin = self._executor_plugin_registry.get_plugin(executor_name)
+            settings = self.workflow.executor_settings.get(executor_name)
+            if settings is not None:
+                plugin.validate_settings(settings)
+            self._executors[executor_name] = plugin.executor(
+                self.workflow,
+                logger,
+                settings,
+            )
+        return self._executors[executor_name]

Follow-up: job submission still uses a single executor (self._executor). To actually federate execution, group runjobs by resolved executor and submit per group. For example:

# inside schedule(), replacing the runjobs branch
if runjobs:
    buckets = {}
    for j in runjobs:
        exec_inst = self.get_executor(j)
        buckets.setdefault(exec_inst, []).append(j)
    for exec_inst, jobs_bucket in buckets.items():
        self.run(jobs_bucket, executor=exec_inst)

Run this to locate other outdated references to executor_plugin_settings:


🏁 Script executed:

#!/bin/bash
rg -n --type=py -C2 '\bexecutor_plugin_settings\b'

Length of output: 599


🏁 Script executed:

#!/bin/bash
rg -n --type=py -C2 '\bexecutor_settings\b'

Length of output: 4990


Fix executor settings lookup and error handling

  • Rename self.workflow.executor_plugin_settings to self.workflow.executor_settings.
  • Replace assert isinstance(executor_name, str) with explicit WorkflowError raises for missing or non‐string executors (add from snakemake.exceptions import WorkflowError).
  • Only call plugin.validate_settings(settings) when settings is not None.
  • To federate execution, group runjobs by executor instance and submit each bucket separately.
🧰 Tools
🪛 Ruff (0.12.2)

176-176: Use of assert detected

(S101)

🤖 Prompt for AI Agents
In src/snakemake/scheduling/job_scheduler.py around lines 174 to 187, the code
needs multiple fixes: change lookup from self.workflow.executor_plugin_settings
to self.workflow.executor_settings; replace the assert with explicit
WorkflowError raises (import WorkflowError from snakemake.exceptions) to handle
missing or non-string executor names (raise a WorkflowError when the "executor"
key is missing or not a str); only call plugin.validate_settings(settings) if
settings is not None; and adjust execution submission so runjobs are grouped by
executor instance (bucket runjobs by the resolved executor object and submit
each bucket separately) to enable federated execution.

@cmeesters
Copy link
Member

could be `snakemake --default-resources executor="'slurm' if resources.mem_mb > 10000 else 'local'", too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

WIP Work in progress. Not meant for merging.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants