lamindb.flow
¶
- lamindb.flow(uid=None, global_run='clear', track_arg_aliases=True)¶
Use
@flow()to track a function as a workflow.You will be able to see inputs, outputs, and parameters of the function in the data lineage graph.
The decorator creates a
Transformwith kind"script"that maps onto the file in which the function is defined. The function maps onto an entrypoint of thetransform. A function execution creates aRunobject that stores the function name inrun.entrypoint. If the function is defined in a notebook cell or another ephemeral context, the transform is created with kind"function".By default
@ln.flow(), likeln.track(), creates a global run context that can be accessed withln.context.run.- Parameters:
uid (
str|None, default:None) – Persist the uid to identify a transform across renames.global_run (
Literal['memorize','clear','none'], default:'clear') – If"clear", set the global run contextln.context.runand clear after the function completes. If"memorize", set the global run context and do not clear after the function completes. Set this to"none"if you want to track concurrent executions of aflow()in the same Python process.track_arg_aliases (
bool, default:True) – IfTrue(default), maps function arguments with namesproject,space,branch,plan, andinitiated_by_runto matchingln.track()arguments while also keeping them inrun.paramsfor reproducibility. PassFalseto disable this mapping.
- Return type:
Callable[[Callable[[ParamSpec(P, bound=None)],TypeVar(R)]],Callable[[ParamSpec(P, bound=None)],TypeVar(R)]]
Examples
To sync a workflow with a file in a git repo, see: Organize local development.
For an extensive guide, see: Manage workflows. Here follow some examples.
my_workflow.py¶import lamindb as ln @ln.flow() def ingest_dataset(key: str) -> ln.Artifact: df = ln.examples.datasets.mini_immuno.get_dataset1() artifact = ln.Artifact.from_dataframe(df, key=key).save() return artifact if __name__ == "__main__": ingest_dataset(key="my_analysis/dataset.parquet")
my_workflow_with_step.py¶import lamindb as ln @ln.step() def subset_dataframe( artifact: ln.Artifact, subset_rows: int = 2, subset_cols: int = 2, ) -> ln.Artifact: df = artifact.load() new_data = df.iloc[:subset_rows, :subset_cols] new_key = artifact.key.replace(".parquet", "_subsetted.parquet") return ln.Artifact.from_dataframe(new_data, key=new_key).save() @ln.flow() def ingest_dataset(key: str, subset: bool = False) -> ln.Artifact: df = ln.examples.datasets.mini_immuno.get_dataset1() artifact = ln.Artifact.from_dataframe(df, key=key).save() if subset: artifact = subset_dataframe(artifact) return artifact if __name__ == "__main__": ingest_dataset(key="my_analysis/dataset.parquet", subset=True)
my_workflow_with_click.py¶import click import lamindb as ln @click.command() @click.option("--key", required=True) @ln.flow() def main(key: str): df = ln.examples.datasets.mini_immuno.get_dataset2() ln.Artifact.from_dataframe(df, key=key).save() if __name__ == "__main__": main()