Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 48 additions & 44 deletions wfcommons/wfinstances/logs/taskvine.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self,
self.task_runtimes = {}
self.task_input_files = {}
self.task_output_files = {}
self.known_task_ids = []

def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
"""
Expand All @@ -101,16 +102,35 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:

# Construct the task command-line array
self._construct_task_command_lines()
# sys.stderr.write(str(self.task_command_lines) + "\n")
# sys.stderr.write(str(self.task_command_lines.keys()) + "\n")

# At this point, the ONLY tasks we care about are the ones for which we have a command-line
self.known_task_ids = sorted(self.task_command_lines.keys())

# Construct file map
self._construct_file_map()
# sys.stderr.write("FILEMAP: " + str(self.files_map) + "\n")
for file_key in self.files_map.keys():
if not "size" in self.files_map[file_key]:
sys.stderr.write(f"Warning: Could not determine size for file with key {file_key}: assuming zero bytes.\n")
self.files_map[file_key]["size"] = 0
sys.stderr.write(f"Identified {len(self.files_map)} valid files\n")

# Construct the task runtimes
self._construct_task_runtimes()
# sys.stderr.write("TASK RUN TIMES: " + str(self.task_runtimes) + "\n")

# Check whether every known task has a runtime, and if not forget it :(
to_remove = []
for task_id in self.known_task_ids:
if task_id not in self.task_runtimes.keys():
sys.stderr.write(f"Warning: Ignoring task {task_id} because runtime could not be determined.\n")
to_remove.append(task_id)
for victim in to_remove:
self.known_task_ids.remove(victim)

sys.stderr.write(f"Identified {len(self.known_task_ids)} valid tasks\n")

# Construct the input and output file for each task
self._construct_task_input_output_files()
# print("TASK INPUT FILES: " + str(self.task_input_files))
Expand All @@ -130,6 +150,8 @@ def _construct_task_command_lines(self) -> None:
command_line = previous_line[previous_line.find("busy on '") + len("busy on '"):-2]
self.task_command_lines[int(task_index)] = command_line
executable = command_line.split()[0]
# May not be full-proof in case of commands like "export A=b; executable ..." but
# may help.....
self.filenames_to_ignore.add(executable)
previous_line = line

Expand All @@ -142,78 +164,60 @@ def _construct_file_map(self) -> None:
for line in f:
if "__vine_env_task" in line: # Ignore that weird task/file
continue
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
if "infile " in line:
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[0:2]
# 2025/09/09 21:12:42.12 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): outfile file-rnd-jpnzrjrjnxxqhej blastall_00000017_outfile_0017 0
elif "outfile " in line:
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[0:2]
if "infile " in line :
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[:2]
elif "outfile " in line and "completed with outfile " not in line and "outfile =" not in line:
# 2025/09/30 18:37:19.74 vine_manager[1849324]vine: tx to d64cepyc028.crc.nd.edu (10.32.94.18:47558): outfile temp-rnd-pidiwheippcwbeu fde2b5eb-9713-423a-8bc6-f4f9263ad20b.pkl 0 3
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[:2]
else:
continue
if filename in self.filenames_to_ignore:
continue
self.files_map[file_key] = {"filename": filename}
# NOTE THAT THE FILENAME MAY NOT BE UNIQUE IN TASKVINE WORKFLOWS, SO
# WE ADD THE KEY
self.files_map[file_key] = {"filename": filename + "." + file_key}
filename_to_key_map[filename] = file_key

# Another pass through the debug file to get the actual file paths
with open(self.debug_file) as f:
for line in f:
# 2025/09/09 21:12:48.01 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) needs file data/blastall_00000003_outfile_0003 as blastall_00000003_outfile_0003
if "needs file " in line:
[full_path, ignore, filename] = line[line.find("needs file ") + len("needs file "):].split()[0:3]
file_key = filename_to_key_map.get(filename)
# 2025/09/09 21:12:47.92 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) sending back file-rnd-jajwzwsrtyzbkfs to data/blastall_00000020_outfile_0020
elif "sending back " in line:
[file_key, ignore, full_path] = line[line.find("sending back ") + len("sending back "):].split()[0:3]
filename = self.files_map[file_key]["filename"]
else:
continue
if filename in self.filenames_to_ignore:
continue
self.files_map[file_key]["path"] = full_path

# Pass through the transactions file to get the file sizes
with open(self.transactions_file) as f:
with open(self.debug_file) as f:
for line in f:
# 1757452362084671 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER OUTPUT file-rnd-wzkjcrgiivvzbci 227273 1327 1757452362083301
# 1757452358704968 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER INPUT file-meta-9b84b334875319e856f72be634aae964 17648 1129 1757452358703835
if "TRANSFER INPUT " in line:
[file_key, file_size] = line[line.find("TRANSFER INPUT ") + len("TRANSFER INPUT "):].split()[0:2]
elif "TRANSFER OUTPUT " in line:
[file_key, file_size] = line[line.find("TRANSFER OUTPUT ") + len("TRANSFER OUTPUT "):].split()[0:2]
elif "CACHE_UPDATE " in line:
[file_key, file_size] = line[line.find("CACHE_UPDATE ") + len("CACHE_UPDATE "):].split()[0:2]
if "): file " in line:
[file_key, file_size] = line[line.find("): file ") + len("): file "):].split()[0:2]
else:
continue
if file_key in self.files_map:
self.files_map[file_key]["size"] = int(file_size)

# print(str(self.files_map))


def _construct_task_runtimes(self) -> None:
task_start_times = {}
task_end_times = {}

# This method consists only of

with open(self.transactions_file) as f:
for line in f:
if line[0] == "#":
continue
if "RUNNING" in line:
[start_date, ignore, ignore, task_index] = line.split()[0:4]
task_start_times[int(task_index)] = int(start_date)
if int(task_index) in self.known_task_ids:
task_start_times[int(task_index)] = int(start_date)
elif "DONE" in line:
[end_date, ignore, ignore, task_index] = line.split()[0:4]
task_end_times[int(task_index)] = int(end_date)
if int(task_index) in self.known_task_ids:
task_end_times[int(task_index)] = int(end_date)

for task_index in task_start_times:
self.task_runtimes[task_index] = (
if task_index in task_end_times:
self.task_runtimes[task_index] = (
float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0)

def _construct_task_input_output_files(self) -> None:

# Initialize all entries
for task_id in self.task_runtimes.keys():
for task_id in self.known_task_ids:
self.task_input_files[task_id] = []
self.task_output_files[task_id] = []

Expand All @@ -238,7 +242,7 @@ def _construct_task_input_output_files(self) -> None:
if destination.startswith("file-"):
destination = destination[len("file-"):]

if "task" in source and "file" not in source:
if "task-" in source and "file-" not in source:
try:
task_id = int(source.split("-")[1])
except ValueError as e:
Expand Down Expand Up @@ -273,15 +277,15 @@ def _construct_workflow(self) -> None:
for file_key in self.files_map:
filename = self.files_map[file_key]["filename"]
file_size = self.files_map[file_key]["size"]
# file_path = self.files_map[file_key]["path"]
file_object_map[filename] = File(file_id=filename,
size=file_size,
logger=self.logger)

# Create all tasks
task_map = {}
for task_id in self.task_runtimes:
task_name = "Task %d" % task_id
# print(self.task_runtimes[16])
for task_id in self.known_task_ids:
task_name = "Task_%d" % task_id
task = Task(name=task_name,
task_id=task_name,
task_type=TaskType.COMPUTE,
Expand Down