-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
This issue is a collection of notes of ongoing discussions between the Fugue and PyCaret teams.
Collaboration
I'm Kevin Kho, one of the maintainers of Fugue. Fugue is an abstraction layer that brings Python/Pandas/SQL code to Spark or Dask. I was going over the issues and there are a lot of requests to both parallelize (single-machine) and distribute (cluster) the compare_models function to scale it instead of running each model training sequentially. We got in touch with @moezali1 and @mfahadakbar on how to achieve this. As long as we solve the distributed portion, we can get the parallelization for free because we can execute the training jobs on local Dask or local Spark. Local Dask is basically a multiprocessing pool.
Porting to Spark or Dask
Fugue has the transform function which lets users port a single function to Spark and Dask. We can use this as the backbone to bring compare_models to Spark and Dask. Basically, compare_models creates a list of models and then loops through the training. If we create a DataFrame that holds the training information, we can then break up each row of this DataFrame and apply the training function. An example of that is here. With this setup, the problem becomes agnostic to the execution engine, and then we can just choose Spark or Dask during runtime.
Change to PyCaret interface
We can do this very elegantly by exposing two new keywords to the setup() interface. We can introduce fugue_engine and fugue_conf. The fugue_engine will be something like "spark" or "dask", while the fugue_conf will be something like a SparkSession, the configuration for the distributed engine. Based on the presence of these keywords, we can move to the Fugue transform code path which will be slightly different but take care of the distributed execution.
The second change we propose is to have setup() take in a callable in addition to a DataFrame. In short, the signature should be Union[Callable, DataFrame]. The reason for this is passing a DataFrame in a distributed setting will rely on the driver node uploading it to all of the workers. If the data is stored in a location like S3/GCS, it is significantly faster if the workers could just download the data directly through a callable.
A note of levels of parallelism
As articulated by @mfahadakbar, there are three forms of parallel/distributed training.
Compute bound
- For one small dataset, we try the models in the
compare_modellist. - For one big dataset, we partition it into several small datasets and then train
compare_modelsfor each small dataset
Memory bound
3. For one big dataset, we distribute the training with something like Spark MLLib
Number 1 is what we can achieve easily. There is some discussion around number 2 above. Intuitively, it may make sense as a two-stage parallel computation. The first stage would be on the data, and the second stage would be on the models. Multi-level parallelism in a distributed setting is a tricky thing because processes can compete for resources and make the hardware hang. The first stage of parallelism already uses all of the available hardware, which means that the second stage of parallelism doesn't really have anything to parallelize over.
This is actually also a concern on local execution. Tree based models take in an n_jobs parameter and already parallelize work locally. This means that n_jobs needs to be explicitly set to 1 when parallelizing multiple tree-based models because each one of them will try to occupy all of the resources.
Immediate next steps
@goodwanghan has a prototype for the distributable code snippet that can be added to the setup() function. We'll go over it, but it should be a minimal addition to the PyCaret code.