Skip to content

atoomic/Parallel-Subs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

34 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NAME

Parallel::Subs - Simple way to run subs in parallel and process their return value in perl

VERSION

version 0.003

SYNOPSIS

use Parallel::Subs;

my $p = Parallel::Subs->new();
#    or Parallel::Subs->new( max_process => N )
#    or Parallel::Subs->new( max_process_per_cpu => P )
#    or Parallel::Subs->new( max_memory => M )
#    or Parallel::Subs->new( timeout => T );

# add a first sub which will be launched by its own kid
$p->add(  
    sub { # will be launched in parallel
        # any code that take time to execute can go there
        print "Hello from kid $$\n";
    }
);
# add a second sub
$p->add(
    sub { print "Hello from kid $$\n" }
 );
$p->add( \&do_something );

# Trigger all the subs to run in parallel using a limited number of process
$p->wait_for_all();

print qq[This is done.\n];

Chaining the subs

You can also chain the 'add', or even the 'wait_for_all', which can make your code easier to read.

use Parallel::Subs;

my $p = Parallel::Subs->new()
 ->add( sub{ print "Hello from kid $$\n"; sleep 5; } )
 ->add( sub{ print "Hello from kid $$\n"; sleep 4; } )
 ->add( sub{ print "Hello from kid $$\n"; sleep 3; } )
 ->add( sub{ print "Hello from kid $$\n"; sleep 2; } )
 ->add( sub{ print "Hello from kid $$\n"; sleep 1; } )
 ->add( sub{ print "Hello from kid $$\n" } )
 ->wait_for_all();
 # or ->wait_for_all_optimized(); # beta - group jobs and run one single fork per/cpu

print qq[This is done.\n];

Run subs in parallel and use their return values

use Parallel::Subs;

my $sum;

sub work_to_do {
    my ( $a, $b ) = @_;
    return sub {
        print "Running in parallel from process $$\n";
        # need some time to execute...
        # return 42;
        # return { value => 42 };
        # return [ 1..9 ];
        return $a * $b;
        }
}

sub read_result {
    my $result = shift;

    $sum += $result;
}

my $p = Parallel::Subs->new();
$p->add(
    sub {
        my $time = int( rand(2) );
        sleep($time);
        return { number => 1, time => $time };
    },
    sub {
        # run from the main process once the kid process has finished its work
        #   to access return values from previous sub
        my $result = shift;
        $sum += $result->{number};

        return;
    }
    )->add( work_to_do( 1, 2 ), \&read_result )
    ->add( work_to_do( 3, 4 ),  \&read_result )
    ->add( work_to_do( 5, 6 ),  \&read_result )
    ->add( work_to_do( 7, 8 ),  \&read_result )
    ->add( work_to_do( 9, 10 ), \&read_result );

$p->wait_for_all();

Named jobs

You can give jobs a name and retrieve their results by name instead of position.

use Parallel::Subs;

my $p = Parallel::Subs->new();
$p->add( 'users',  sub { fetch_users()  } );
$p->add( 'orders', sub { fetch_orders() } );
$p->wait_for_all();

my $users  = $p->result('users');
my $orders = $p->result('orders');

Named and unnamed jobs can be mixed freely. results() always returns all results in insertion order regardless of naming.

DESCRIPTION

Parallel::Subs is a simple object interface used to launch tasks in parallel. It uses Parallel::ForkManager to run subroutines in child processes and collect their return values.

You can also provide a second optional sub (callback) to process the result returned by each child process from the main process.

NAME

Parallel::Subs - simple object interface to launch subs in parallel and process their return values.

METHODS

new

Create a new Parallel::Subs object.

By default it will use the number of CPU cores as the maximum number of parallel jobs. You can control this with the following options:

  • max_process -set the maximum number of parallel processes directly
  • max_process_per_cpu -multiplied by the number of CPU cores
  • max_memory -in MB per job. Uses the minimum between the number of CPUs and total available memory / max_memory (Linux only, requires Sys::Statistics::Linux::MemStats)
  • timeout -in seconds. If a child process takes longer than this, it is killed via SIGALRM. Applies to each fork individually (in optimized mode, the timeout covers the grouped jobs within each fork).
my $p = Parallel::Subs->new();
my $p = Parallel::Subs->new( max_process => 4 );
my $p = Parallel::Subs->new( max_process_per_cpu => 2 );
my $p = Parallel::Subs->new( max_memory => 512 );
my $p = Parallel::Subs->new( timeout => 30 );

$p->add([$name], $code, [$callback])

Add a sub to be run in parallel. An optional name (string) can be provided as the first argument to identify this job for later retrieval via result().

$p->add( sub { 1 } );
$p->add( sub { return { 1..6 } }, sub { my $result = shift; ... } );
$p->add( 'fetch_users', sub { ... } );
$p->add( 'compute', sub { heavy_calc() }, sub { process(shift) } );

$p->total_jobs

Returns the total number of jobs added so far.

$p->wait_for_all_optimized

Similar to wait_for_all but reduces the number of forks by grouping tasks together to be run by the same process.

Beta: does not support callbacks. Callbacks will be cleared with a warning.

$p->run

Runs all added jobs in parallel and waits for them to complete. Returns the raw results hashref (keyed by job name). You typically don't need this method directly -use wait_for_all instead.

$p->wait_for_all

Triggers all added jobs to run in parallel and waits for them to finish. Callbacks (if any) are invoked as each job completes, not after all jobs finish. This means callbacks fire in completion order, which may differ from the order jobs were added. Returns $self for chaining.

$p->results

Returns an array reference of results, in the same order as jobs were added.

$p->result($name)

Returns the result for a named job. The name must have been provided when the job was added via add().

$p->add( 'fetch_users', sub { get_users() } );
$p->wait_for_all();
my $users = $p->result('fetch_users');

Croaks if the name is unknown.

AUTHOR

Nicolas R atoomic@cpan.org

COPYRIGHT AND LICENSE

This software is copyright (c) 2018 by Nicolas R.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.

About

Simple way to run subs in parallel and process their return value in perl

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages