Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def write_status(self, path):

def _check_study_completion(self):
# We cancelled, return True marking study as complete.
if self.is_canceled:
if self.is_canceled and not self.in_progress:
LOGGER.info("Cancelled -- completing study.")
return StudyStatus.CANCELLED

Expand Down Expand Up @@ -753,7 +753,7 @@ def execute_ready_steps(self):
# For the status of each currently in progress job, check its
# state.
cleanup_steps = set() # Steps that are in progress showing failed.

cancel_steps = set() # Steps that have dependencies to mark cancelled
for name, status in job_status.items():
LOGGER.debug("Checking job '%s' with status %s.", name, status)
record = self.values[name]
Expand Down Expand Up @@ -841,12 +841,18 @@ def execute_ready_steps(self):
LOGGER.info("Step '%s' was cancelled.", name)
self.in_progress.remove(name)
record.mark_end(State.CANCELLED)
cancel_steps.update(self.bfs_subtree(name)[0])

# Let's handle all the failed steps in one go.
for node in cleanup_steps:
self.failed_steps.add(node)
self.values[node].mark_end(State.FAILED)

# Handle dependent steps that need cancelling
for node in cancel_steps:
self.cancelled_steps.add(node)
self.values[node].mark_end(State.CANCELLED)

# Now that we've checked the statuses of existing jobs we need to make
# sure dependencies haven't been met.
for key in self.values.keys():
Expand All @@ -872,9 +878,9 @@ def execute_ready_steps(self):
"Unfulfilled dependencies: %s",
self._dependencies[key])

s_completed = filter(
s_completed = list(filter(
lambda x: x in self.completed_steps,
self._dependencies[key])
self._dependencies[key]))
self._dependencies[key] = \
self._dependencies[key] - set(s_completed)
LOGGER.debug(
Expand Down
8 changes: 7 additions & 1 deletion maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,15 @@ def cancel(cls, joblist):
"\n".join(str(j) for j in joblist),
)

# NOTE: cannot pickle JobID instances, so must store as strings and
# reconstruct for use
jobs_rpc = flux.job.list.JobList(
cls.flux_handle,
ids=[flux.job.JobID(jid) for jid in joblist])

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
for job in jobs_rpc:
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
Expand Down
16 changes: 11 additions & 5 deletions maestrowf/interfaces/script/_flux/flux0_49_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class FluxInterface_0490(FluxInterface):
# This utility class is for Flux 0.26.0
# This utility class is for Flux 0.49.0
key = "0.49.0"

flux_handle = None
Expand Down Expand Up @@ -242,14 +242,20 @@ def cancel(cls, joblist):
"\n".join(str(j) for j in joblist),
)

# NOTE: cannot pickle JobID instances, so must store as strings and
# reconstruct for use
jobs_rpc = flux.job.list.JobList(
cls.flux_handle,
ids=[flux.job.JobID(jid) for jid in joblist])

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
for job in jobs_rpc.jobs():
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
LOGGER.debug("Cancelling Job %s...", str(job.id.f58))
flux.job.cancel(cls.flux_handle, int(job.id))
except Exception as exception:
LOGGER.error(str(exception))
LOGGER.error("Job %s: %s", str(job.id.f58), str(exception))
cancel_code = CancelCode.ERROR
cancel_rcode = 1

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool]
[tool.poetry]
name = "maestrowf"
version = "1.1.10dev4"
version = "1.1.10dev5"
description = "A tool to easily orchestrate general computational workflows both locally and on supercomputers."
license = "MIT License"
classifiers = [
Expand Down