Skip to content

Integration with Fugue to Distribute PyCaret Training Jobs to Spark/Dask #2015

@kvnkho

Description

@kvnkho

This issue is a collection of notes of ongoing discussions between the Fugue and PyCaret teams.

Collaboration

I'm Kevin Kho, one of the maintainers of Fugue. Fugue is an abstraction layer that brings Python/Pandas/SQL code to Spark or Dask. I was going over the issues and there are a lot of requests to both parallelize (single-machine) and distribute (cluster) the compare_models function to scale it instead of running each model training sequentially. We got in touch with @moezali1 and @mfahadakbar on how to achieve this. As long as we solve the distributed portion, we can get the parallelization for free because we can execute the training jobs on local Dask or local Spark. Local Dask is basically a multiprocessing pool.

Porting to Spark or Dask

Fugue has the transform function which lets users port a single function to Spark and Dask. We can use this as the backbone to bring compare_models to Spark and Dask. Basically, compare_models creates a list of models and then loops through the training. If we create a DataFrame that holds the training information, we can then break up each row of this DataFrame and apply the training function. An example of that is here. With this setup, the problem becomes agnostic to the execution engine, and then we can just choose Spark or Dask during runtime.

Change to PyCaret interface

We can do this very elegantly by exposing two new keywords to the setup() interface. We can introduce fugue_engine and fugue_conf. The fugue_engine will be something like "spark" or "dask", while the fugue_conf will be something like a SparkSession, the configuration for the distributed engine. Based on the presence of these keywords, we can move to the Fugue transform code path which will be slightly different but take care of the distributed execution.

The second change we propose is to have setup() take in a callable in addition to a DataFrame. In short, the signature should be Union[Callable, DataFrame]. The reason for this is passing a DataFrame in a distributed setting will rely on the driver node uploading it to all of the workers. If the data is stored in a location like S3/GCS, it is significantly faster if the workers could just download the data directly through a callable.

A note of levels of parallelism

As articulated by @mfahadakbar, there are three forms of parallel/distributed training.

Compute bound

  1. For one small dataset, we try the models in the compare_model list.
  2. For one big dataset, we partition it into several small datasets and then train compare_models for each small dataset

Memory bound
3. For one big dataset, we distribute the training with something like Spark MLLib

Number 1 is what we can achieve easily. There is some discussion around number 2 above. Intuitively, it may make sense as a two-stage parallel computation. The first stage would be on the data, and the second stage would be on the models. Multi-level parallelism in a distributed setting is a tricky thing because processes can compete for resources and make the hardware hang. The first stage of parallelism already uses all of the available hardware, which means that the second stage of parallelism doesn't really have anything to parallelize over.

This is actually also a concern on local execution. Tree based models take in an n_jobs parameter and already parallelize work locally. This means that n_jobs needs to be explicitly set to 1 when parallelizing multiple tree-based models because each one of them will try to occupy all of the resources.

Immediate next steps

@goodwanghan has a prototype for the distributable code snippet that can be added to the setup() function. We'll go over it, but it should be a minimal addition to the PyCaret code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions