-
Notifications
You must be signed in to change notification settings - Fork 617
feat!: federated execution by specifying per-rule executors #3721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThe change shifts executor configuration from a single settings object to a mapping keyed by executor name. Workflow and API signatures are updated accordingly, executor plugin validation is removed here, and job scheduling now resolves and caches executors per job via a plugin registry using the job’s executor resource. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant DagApi
participant Workflow
participant Scheduler
participant ExecRegistry as ExecutorPluginRegistry
participant Executor as Executor (per-name)
User->>DagApi: execute_workflow(..., executor_settings: {name->settings}, ...)
DagApi->>Workflow: execute(executor_settings=map, ...)
Workflow->>Scheduler: start scheduling jobs
loop For each job
Scheduler->>Scheduler: job.resources["executor"] = name
Scheduler->>ExecRegistry: get(name) / validate(settings[name])
ExecRegistry-->>Scheduler: executor factory
Scheduler->>Executor: instantiate(or reuse) with settings[name]
Scheduler->>Executor: submit(job)
Executor-->>Scheduler: job result/status
Scheduler->>Executor: handle_job_success(job)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–75 minutes Possibly related PRs
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
Please format your code with black: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/snakemake/workflow.py (2)
1202-1209: Broken execute() signature: executor_plugin is still used but no longer passedexecute() references executor_plugin in multiple places (e.g., Lines 1227, 1313, 1321) causing NameError and breaking dryrun/local_exec logic. Restore the parameter and pass it from API.
Apply this diff to restore the parameter:
def execute( self, - executor_settings: Mapping[str, ExecutorSettingsBase], + executor_plugin: ExecutorPlugin, + executor_settings: Mapping[str, ExecutorSettingsBase], scheduler_plugin: SchedulerPlugin, scheduler_settings: Optional[SchedulerSettingsBase], greedy_scheduler_settings: GreedySchedulerSettings, updated_files: Optional[List[str]] = None, ):And ensure API calls pass executor_plugin (see api.py comment).
1277-1281: updated_files collects IOFile objects but is typed as List[str]Return strings to match the public API expectation.
Apply this diff:
- if updated_files is not None: - updated_files.extend( - f for job in self.dag.needrun_jobs() for f in job.output - ) + if updated_files is not None: + updated_files.extend( + str(f) for job in self.dag.needrun_jobs() for f in job.output + )src/snakemake/api.py (1)
644-651: Pass executor_plugin into workflow.execute after signature fixworkflow.execute still needs the main executor plugin for global behavior and logging.
Apply this diff:
- workflow.execute( - executor_settings=executor_settings, + workflow.execute( + executor_plugin=executor_plugin, + executor_settings=executor_settings, scheduler_plugin=scheduler_plugin, scheduler_settings=scheduler_settings, greedy_scheduler_settings=greedy_scheduler_settings, updated_files=updated_files, )
🧹 Nitpick comments (2)
src/snakemake/scheduling/job_scheduler.py (1)
131-133: Executor cache and local registry are finePer-executor cache (_executors) and a registry instance are appropriate. You can reuse the module-level singleton, but not required.
src/snakemake/api.py (1)
486-488: Default executor_settings mappingInitializing to {} is fine; no change needed. If you want immutability parity with Workflow default, consider immutables.Map(), but optional.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
src/snakemake/api.py(4 hunks)src/snakemake/scheduling/job_scheduler.py(3 hunks)src/snakemake/workflow.py(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/workflow.pysrc/snakemake/scheduling/job_scheduler.pysrc/snakemake/api.py
🧠 Learnings (1)
📚 Learning: 2024-10-13T14:10:37.796Z
Learnt from: johanneskoester
PR: snakemake/snakemake#3114
File: snakemake/cli.py:708-708
Timestamp: 2024-10-13T14:10:37.796Z
Learning: In the `snakemake/cli.py` file, `ExecutorPluginRegistry()` is a singleton class, so multiple instantiations are acceptable and do not cause unnecessary overhead.
Applied to files:
src/snakemake/scheduling/job_scheduler.py
🧬 Code graph analysis (2)
src/snakemake/scheduling/job_scheduler.py (1)
src/snakemake/jobs.py (3)
AbstractJob(85-119)resources(489-510)resources(1512-1526)
src/snakemake/api.py (1)
src/snakemake/resources.py (1)
set_resource(69-71)
🪛 Ruff (0.12.2)
src/snakemake/scheduling/job_scheduler.py
176-176: Use of assert detected
(S101)
src/snakemake/api.py
591-591: Use of assert detected
(S101)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (34)
- GitHub Check: tests (9, windows-latest, py312)
- GitHub Check: tests (10, windows-latest, py312)
- GitHub Check: tests (6, macos-latest, py312)
- GitHub Check: tests (10, windows-latest, py313)
- GitHub Check: tests (9, macos-latest, py313)
- GitHub Check: tests (9, windows-latest, py313)
- GitHub Check: tests (8, windows-latest, py312)
- GitHub Check: tests (8, windows-latest, py313)
- GitHub Check: tests (7, macos-latest, py313)
- GitHub Check: tests (7, windows-latest, py313)
- GitHub Check: tests (4, ubuntu-latest, py311)
- GitHub Check: tests (6, ubuntu-latest, py313)
- GitHub Check: tests (7, windows-latest, py312)
- GitHub Check: tests (5, ubuntu-latest, py313)
- GitHub Check: tests (6, windows-latest, py313)
- GitHub Check: tests (5, windows-latest, py313)
- GitHub Check: tests (6, ubuntu-latest, py311)
- GitHub Check: tests (5, ubuntu-latest, py311)
- GitHub Check: tests (5, ubuntu-latest, py312)
- GitHub Check: tests (5, windows-latest, py312)
- GitHub Check: tests (6, windows-latest, py312)
- GitHub Check: tests (6, ubuntu-latest, py312)
- GitHub Check: tests (1, macos-latest, py313)
- GitHub Check: tests (4, windows-latest, py313)
- GitHub Check: tests (2, macos-latest, py313)
- GitHub Check: tests (2, windows-latest, py312)
- GitHub Check: tests (4, ubuntu-latest, py313)
- GitHub Check: tests (3, windows-latest, py312)
- GitHub Check: tests (4, windows-latest, py312)
- GitHub Check: tests (2, windows-latest, py313)
- GitHub Check: tests (3, windows-latest, py313)
- GitHub Check: tests (4, ubuntu-latest, py312)
- GitHub Check: tests (1, windows-latest, py313)
- GitHub Check: tests (1, windows-latest, py312)
🔇 Additional comments (4)
src/snakemake/workflow.py (2)
22-23: LGTM: importing immutables for persistent mappingsImport aligns with new executor_settings default. No issues.
161-161: Executor settings as an immutable mapping is fineDefaulting to immutables.Map() is safe and avoids accidental mutation.
src/snakemake/scheduling/job_scheduler.py (1)
22-22: LGTM: type-hint import for AbstractExecutorImport is correct and scoped.
src/snakemake/api.py (1)
461-476: Docs updated for mapping-based executor settingsLooks good and consistent with the PR intent.
| assert self.workflow_api.resource_settings.default_resources is not None | ||
| self.workflow_api.resource_settings.default_resources.set_resource("executor", executor) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure default_resources exists before setting 'executor'
assert can be stripped in optimized runs. Create defaults if absent to make local execution path robust.
Apply this diff:
- assert self.workflow_api.resource_settings.default_resources is not None
- self.workflow_api.resource_settings.default_resources.set_resource("executor", executor)
+ if self.workflow_api.resource_settings.default_resources is None:
+ self.workflow_api.resource_settings.default_resources = DefaultResources()
+ self.workflow_api.resource_settings.default_resources.set_resource(
+ "executor", executor
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert self.workflow_api.resource_settings.default_resources is not None | |
| self.workflow_api.resource_settings.default_resources.set_resource("executor", executor) | |
| if self.workflow_api.resource_settings.default_resources is None: | |
| self.workflow_api.resource_settings.default_resources = DefaultResources() | |
| self.workflow_api.resource_settings.default_resources.set_resource( | |
| "executor", executor | |
| ) |
🧰 Tools
🪛 Ruff (0.12.2)
591-591: Use of assert detected
(S101)
🤖 Prompt for AI Agents
In src/snakemake/api.py around lines 591-593, replace the bare assert with a
runtime check that ensures self.workflow_api.resource_settings.default_resources
exists: if it is None, instantiate the module's standard default-resources
object (e.g., create a new DefaultResources / ResourceDefaults instance or call
the factory used elsewhere in the codebase) and assign it to
self.workflow_api.resource_settings.default_resources, then call
set_resource("executor", executor) as before; this guarantees a valid object in
optimized runs where asserts are stripped.
| def get_executor(self, job: AbstractJob) -> AbstractExecutor: | ||
| executor_name = job.resources["executor"] | ||
| assert isinstance(executor_name, str) | ||
| if executor_name not in self._executors: | ||
| plugin = self._executor_plugin_registry.get_plugin(executor_name) | ||
| settings = self.workflow.executor_plugin_settings.get(executor_name) | ||
| plugin.validate_settings(settings) | ||
| self._executors[executor_name] = plugin.executor( | ||
| self.workflow, | ||
| logger, | ||
| settings, | ||
| ) | ||
| return self._executors[executor_name] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix executor settings lookup and avoid assert; add robust validation
- Uses nonexistent attribute workflow.executor_plugin_settings (should be workflow.executor_settings).
- assert on a runtime condition (S101) can be stripped; raise a WorkflowError instead.
- Guard validate_settings(None) to avoid plugins erroring on None.
Apply this diff:
- def get_executor(self, job: AbstractJob) -> AbstractExecutor:
- executor_name = job.resources["executor"]
- assert isinstance(executor_name, str)
- if executor_name not in self._executors:
- plugin = self._executor_plugin_registry.get_plugin(executor_name)
- settings = self.workflow.executor_plugin_settings.get(executor_name)
- plugin.validate_settings(settings)
- self._executors[executor_name] = plugin.executor(
- self.workflow,
- logger,
- settings,
- )
- return self._executors[executor_name]
+ def get_executor(self, job: AbstractJob) -> AbstractExecutor:
+ try:
+ executor_name = job.resources["executor"]
+ except KeyError:
+ raise WorkflowError(
+ f"Job {job} does not define an 'executor' resource; unable to resolve executor."
+ )
+ if not isinstance(executor_name, str):
+ raise WorkflowError(
+ f"Executor resource for job {job} must be a string, got {type(executor_name).__name__}."
+ )
+ if executor_name not in self._executors:
+ plugin = self._executor_plugin_registry.get_plugin(executor_name)
+ settings = self.workflow.executor_settings.get(executor_name)
+ if settings is not None:
+ plugin.validate_settings(settings)
+ self._executors[executor_name] = plugin.executor(
+ self.workflow,
+ logger,
+ settings,
+ )
+ return self._executors[executor_name]Follow-up: job submission still uses a single executor (self._executor). To actually federate execution, group runjobs by resolved executor and submit per group. For example:
# inside schedule(), replacing the runjobs branch
if runjobs:
buckets = {}
for j in runjobs:
exec_inst = self.get_executor(j)
buckets.setdefault(exec_inst, []).append(j)
for exec_inst, jobs_bucket in buckets.items():
self.run(jobs_bucket, executor=exec_inst)Run this to locate other outdated references to executor_plugin_settings:
🏁 Script executed:
#!/bin/bash
rg -n --type=py -C2 '\bexecutor_plugin_settings\b'Length of output: 599
🏁 Script executed:
#!/bin/bash
rg -n --type=py -C2 '\bexecutor_settings\b'Length of output: 4990
Fix executor settings lookup and error handling
- Rename
self.workflow.executor_plugin_settingstoself.workflow.executor_settings. - Replace
assert isinstance(executor_name, str)with explicitWorkflowErrorraises for missing or non‐string executors (addfrom snakemake.exceptions import WorkflowError). - Only call
plugin.validate_settings(settings)whensettingsis notNone. - To federate execution, group
runjobsby executor instance and submit each bucket separately.
🧰 Tools
🪛 Ruff (0.12.2)
176-176: Use of assert detected
(S101)
🤖 Prompt for AI Agents
In src/snakemake/scheduling/job_scheduler.py around lines 174 to 187, the code
needs multiple fixes: change lookup from self.workflow.executor_plugin_settings
to self.workflow.executor_settings; replace the assert with explicit
WorkflowError raises (import WorkflowError from snakemake.exceptions) to handle
missing or non-string executor names (raise a WorkflowError when the "executor"
key is missing or not a str); only call plugin.validate_settings(settings) if
settings is not None; and adjust execution submission so runjobs are grouped by
executor instance (bucket runjobs by the resolved executor object and submit
each bucket separately) to enable federated execution.
|
could be `snakemake --default-resources executor="'slurm' if resources.mem_mb > 10000 else 'local'", too? |
Will be complemented by a mechanism to specify per-rule default executors and an automated approach for transferring files between different storage backends/plugins.
Introduces a breaking change in the API, but CLI and workflows remain compatible.
QC
docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).