diff options
Diffstat (limited to 'lib/Future/Utils.pm')
-rw-r--r-- | lib/Future/Utils.pm | 687 |
1 files changed, 687 insertions, 0 deletions
diff --git a/lib/Future/Utils.pm b/lib/Future/Utils.pm new file mode 100644 index 0000000..563f327 --- /dev/null +++ b/lib/Future/Utils.pm @@ -0,0 +1,687 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2013-2015 -- leonerd@leonerd.org.uk + +package Future::Utils; + +use strict; +use warnings; + +our $VERSION = '0.32'; + +use Exporter 'import'; +# Can't import the one from Exporter as it relies on package inheritance +sub export_to_level +{ + my $pkg = shift; local $Exporter::ExportLevel = 1 + shift; $pkg->import(@_); +} + +our @EXPORT_OK = qw( + call + call_with_escape + + repeat + try_repeat try_repeat_until_success + repeat_until_success + + fmap fmap_concat + fmap1 fmap_scalar + fmap0 fmap_void +); + +use Carp; +our @CARP_NOT = qw( Future ); + +use Future; + +=head1 NAME + +C<Future::Utils> - utility functions for working with C<Future> objects + +=head1 SYNOPSIS + + use Future::Utils qw( call_with_escape ); + + my $result_f = call_with_escape { + my $escape_f = shift; + my $f = ... + $escape_f->done( "immediate result" ); + ... + }; + +Z<> + + use Future::Utils qw( repeat try_repeat try_repeat_until_success ); + + my $eventual_f = repeat { + my $trial_f = ... + return $trial_f; + } while => sub { my $f = shift; return want_more($f) }; + + my $eventual_f = repeat { + ... + return $trail_f; + } until => sub { my $f = shift; return acceptable($f) }; + + my $eventual_f = repeat { + my $item = shift; + ... + return $trial_f; + } foreach => \@items; + + my $eventual_f = try_repeat { + my $trial_f = ... + return $trial_f; + } while => sub { ... }; + + my $eventual_f = try_repeat_until_success { + ... + return $trial_f; + }; + + my $eventual_f = try_repeat_until_success { + my $item = shift; + ... + return $trial_f; + } foreach => \@items; + +Z<> + + use Future::Utils qw( fmap_concat fmap_scalar fmap_void ); + + my $result_f = fmap_concat { + my $item = shift; + ... + return $item_f; + } foreach => \@items, concurrent => 4; + + my $result_f = fmap_scalar { + my $item = shift; + ... + return $item_f; + } foreach => \@items, concurrent => 8; + + my $done_f = fmap_void { + my $item = shift; + ... + return $item_f; + } foreach => \@items, concurrent => 10; + +=cut + +=head1 INVOKING A BLOCK OF CODE + +=head2 $f = call { CODE } + +The C<call> function invokes a block of code that returns a future, and simply +returns the future it returned. The code is wrapped in an C<eval {}> block, so +that if it throws an exception this is turned into an immediate failed +C<Future>. If the code does not return a C<Future>, then an immediate failed +C<Future> instead. + +(This is equivalent to using C<< Future->call >>, but is duplicated here for +completeness). + +=cut + +sub call(&) +{ + my ( $code ) = @_; + return Future->call( $code ); +} + +=head2 $f = call_with_escape { CODE } + +The C<call_with_escape> function invokes a block of code that returns a +future, and passes in a separate future (called here an "escape future"). +Normally this is equivalent to the simple C<call> function. However, if the +code captures this future and completes it by calling C<done> or C<fail> on +it, the future returned by C<call_with_escape> immediately completes with this +result, and the future returned by the code itself is cancelled. + +This can be used to implement short-circuit return from an iterating loop or +complex sequence of code, or immediate fail that bypasses failure handling +logic in the code itself, or several other code patterns. + + $f = $code->( $escape_f ) + +(This can be considered similar to C<call-with-escape-continuation> as found +in some Scheme implementations). + +=cut + +sub call_with_escape(&) +{ + my ( $code ) = @_; + + my $escape_f = Future->new; + + return Future->wait_any( + Future->call( $code, $escape_f ), + $escape_f, + ); +} + +=head1 REPEATING A BLOCK OF CODE + +The C<repeat> function provides a way to repeatedly call a block of code that +returns a L<Future> (called here a "trial future") until some ending condition +is satisfied. The C<repeat> function itself returns a C<Future> to represent +running the repeating loop until that end condition (called here the "eventual +future"). The first time the code block is called, it is passed no arguments, +and each subsequent invocation is passed the previous trial future. + +The result of the eventual future is the result of the last trial future. + +If the eventual future is cancelled, the latest trial future will be +cancelled. + +If some specific subclass or instance of C<Future> is required as the return +value, it can be passed as the C<return> argument. Otherwise the return value +will be constructed by cloning the first non-immediate trial C<Future>. + +=head2 $future = repeat { CODE } while => CODE + +Repeatedly calls the C<CODE> block while the C<while> condition returns a true +value. Each time the trial future completes, the C<while> condition is passed +the trial future. + + $trial_f = $code->( $previous_trial_f ) + $again = $while->( $trial_f ) + +If the C<$code> block dies entirely and throws an exception, this will be +caught and considered as an immediately-failed C<Future> with the exception as +the future's failure. The exception will not be propagated to the caller. + +=head2 $future = repeat { CODE } until => CODE + +Repeatedly calls the C<CODE> block until the C<until> condition returns a true +value. Each time the trial future completes, the C<until> condition is passed +the trial future. + + $trial_f = $code->( $previous_trial_f ) + $accept = $until->( $trial_f ) + +=head2 $future = repeat { CODE } foreach => ARRAY, otherwise => CODE + +Calls the C<CODE> block once for each value obtained from the array, passing +in the value as the first argument (before the previous trial future). When +there are no more items left in the array, the C<otherwise> code is invoked +once and passed the last trial future, if there was one, or C<undef> if the +list was originally empty. The result of the eventual future will be the +result of the future returned from C<otherwise>. + +The referenced array may be modified by this operation. + + $trial_f = $code->( $item, $previous_trial_f ) + $final_f = $otherwise->( $last_trial_f ) + +The C<otherwise> code is optional; if not supplied then the result of the +eventual future will simply be that of the last trial. If there was no trial, +because the C<foreach> list was already empty, then an immediate successful +future with an empty result is returned. + +=head2 $future = repeat { CODE } foreach => ARRAY, while => CODE, ... + +=head2 $future = repeat { CODE } foreach => ARRAY, until => CODE, ... + +Combines the effects of C<foreach> with C<while> or C<until>. Calls the +C<CODE> block once for each value obtained from the array, until the array is +exhausted or the given ending condition is satisfied. + +If a C<while> or C<until> condition is combined with C<otherwise>, the +C<otherwise> code will only be run if the array was entirely exhausted. If the +operation is terminated early due to the C<while> or C<until> condition being +satisfied, the eventual result will simply be that of the last trial that was +executed. + +=head2 $future = repeat { CODE } generate => CODE, otherwise => CODE + +Calls the C<CODE> block once for each value obtained from the generator code, +passing in the value as the first argument (before the previous trial future). +When the generator returns an empty list, the C<otherwise> code is invoked and +passed the last trial future, if there was one, otherwise C<undef> if the +generator never returned a value. The result of the eventual future will be +the result of the future returned from C<otherwise>. + + $trial_f = $code->( $item, $previous_trial_f ) + $final_f = $otherwise->( $last_trial_f ) + + ( $item ) = $generate->() + +The generator is called in list context but should return only one item per +call. Subsequent values will be ignored. When it has no more items to return +it should return an empty list. + +For backward compatibility this function will allow a C<while> or C<until> +condition that requests a failure be repeated, but it will print a warning if +it has to do that. To apply repeating behaviour that can catch and retry +failures, use C<try_repeat> instead. This old behaviour is now deprecated and +will be removed in the next version. + +=cut + +sub _repeat +{ + my ( $code, $return, $trialp, $cond, $sense, $is_try ) = @_; + + my $prev = $$trialp; + + while(1) { + my $trial = $$trialp ||= Future->call( $code, $prev ); + $prev = $trial; + + if( !$trial->is_ready ) { + # defer + $return ||= $trial->new; + $trial->on_ready( sub { + return if $$trialp->is_cancelled; + _repeat( $code, $return, $trialp, $cond, $sense, $is_try ); + }); + return $return; + } + + my $stop; + if( not eval { $stop = !$cond->( $trial ) ^ $sense; 1 } ) { + $return ||= $trial->new; + $return->fail( $@ ); + return $return; + } + + if( $stop ) { + # Return result + $return ||= $trial->new; + $trial->on_done( $return ); + $trial->on_fail( $return ); + return $return; + } + + if( !$is_try and $trial->failure ) { + carp "Using Future::Utils::repeat to retry a failure is deprecated; use try_repeat instead"; + } + + # redo + undef $$trialp; + } +} + +sub repeat(&@) +{ + my $code = shift; + my %args = @_; + + # This makes it easier to account for other conditions + defined($args{while}) + defined($args{until}) == 1 + or defined($args{foreach}) + or defined($args{generate}) + or croak "Expected one of 'while', 'until', 'foreach' or 'generate'"; + + if( $args{foreach} ) { + $args{generate} and croak "Cannot use both 'foreach' and 'generate'"; + + my $array = delete $args{foreach}; + $args{generate} = sub { + @$array ? shift @$array : (); + }; + } + + if( $args{generate} ) { + my $generator = delete $args{generate}; + my $otherwise = delete $args{otherwise}; + + # TODO: This is slightly messy as this lexical is captured by both + # blocks of code. Can we do better somehow? + my $done; + + my $orig_code = $code; + $code = sub { + my ( $last_trial_f ) = @_; + my $again = my ( $value ) = $generator->( $last_trial_f ); + + if( $again ) { + unshift @_, $value; goto &$orig_code; + } + + $done++; + if( $otherwise ) { + goto &$otherwise; + } + else { + return $last_trial_f || Future->done; + } + }; + + if( my $orig_while = delete $args{while} ) { + $args{while} = sub { + $orig_while->( $_[0] ) and !$done; + }; + } + elsif( my $orig_until = delete $args{until} ) { + $args{while} = sub { + !$orig_until->( $_[0] ) and !$done; + }; + } + else { + $args{while} = sub { !$done }; + } + } + + my $future = $args{return}; + + my $trial; + $args{while} and $future = _repeat( $code, $future, \$trial, $args{while}, 0, $args{try} ); + $args{until} and $future = _repeat( $code, $future, \$trial, $args{until}, 1, $args{try} ); + + $future->on_cancel( sub { $trial->cancel } ); + + return $future; +} + +=head2 $future = try_repeat { CODE } ... + +A variant of C<repeat> that doesn't warn when the trial fails and the +condition code asks for it to be repeated. + +In some later version the C<repeat> function will be changed so that if a +trial future fails, then the eventual future will immediately fail as well, +making its semantics a little closer to that of a C<while {}> loop in Perl. +Code that specifically wishes to catch failures in trial futures and retry +the block should use C<try_repeat> specifically. + +=cut + +sub try_repeat(&@) +{ + # defeat prototype + &repeat( @_, try => 1 ); +} + +=head2 $future = try_repeat_until_success { CODE } ... + +A shortcut to calling C<try_repeat> with an ending condition that simply tests +for a successful result from a future. May be combined with C<foreach> or +C<generate>. + +This function used to be called C<repeat_until_success>, and is currently +aliased as this name as well. + +=cut + +sub try_repeat_until_success(&@) +{ + my $code = shift; + my %args = @_; + + # TODO: maybe merge while/until conditions one day... + defined($args{while}) or defined($args{until}) + and croak "Cannot pass 'while' or 'until' to try_repeat_until_success"; + + # defeat prototype + &try_repeat( $code, while => sub { shift->failure }, %args ); +} + +# Legacy name +*repeat_until_success = \&try_repeat_until_success; + +=head1 APPLYING A FUNCTION TO A LIST + +The C<fmap> family of functions provide a way to call a block of code that +returns a L<Future> (called here an "item future") once per item in a given +list, or returned by a generator function. The C<fmap*> functions themselves +return a C<Future> to represent the ongoing operation, which completes when +every item's future has completed. + +While this behaviour can also be implemented using C<repeat>, the main reason +to use an C<fmap> function is that the individual item operations are +considered as independent, and thus more than one can be outstanding +concurrently. An argument can be passed to the function to indicate how many +items to start initially, and thereafter it will keep that many of them +running concurrently until all of the items are done, or until any of them +fail. If an individual item future fails, the overall result future will be +marked as failing with the same failure, and any other pending item futures +that are outstanding at the time will be cancelled. + +The following named arguments are common to each C<fmap*> function: + +=over 8 + +=item foreach => ARRAY + +Provides the list of items to iterate over, as an C<ARRAY> reference. + +The referenced array will be modified by this operation, C<shift>ing one item +from it each time. The can C<push> more items to this array as it runs, and +they will be included in the iteration. + +=item generate => CODE + +Provides the list of items to iterate over, by calling the generator function +once for each required item. The function should return a single item, or an +empty list to indicate it has no more items. + + ( $item ) = $generate->() + +This function will be invoked each time any previous item future has completed +and may be called again even after it has returned empty. + +=item concurrent => INT + +Gives the number of item futures to keep outstanding. By default this value +will be 1 (i.e. no concurrency); larger values indicate that multiple item +futures will be started at once. + +=item return => Future + +Normally, a new instance is returned by cloning the first non-immediate future +returned as an item future. By passing a new instance as the C<return> +argument, the result will be put into the given instance. This can be used to +return subclasses, or specific instances. + +=back + +In each case, the main code block will be called once for each item in the +list, passing in the item as the only argument: + + $item_f = $code->( $item ) + +The expected return value from each item's future, and the value returned from +the result future will differ in each function's case; they are documented +below. + +=cut + +# This function is invoked in two circumstances: +# a) to create an item Future in a slot, +# b) once a non-immediate item Future is complete, to check its results +# It can tell which circumstance by whether the slot itself is defined or not +sub _fmap_slot +{ + my ( $slots, undef, $code, $generator, $collect, $results, $return ) = @_; + + SLOT: while(1) { + # Capture args each call because we mutate them + my ( undef, $idx ) = my @args = @_; + + unless( $slots->[$idx] ) { + # No item Future yet (case a), so create one + my $item; + unless( ( $item ) = $generator->() ) { + # All out of items, so now just wait for the slots to be finished + undef $slots->[$idx]; + defined and return $return for @$slots; + + # All the slots are done + $return ||= Future->new; + + $return->done( @$results ); + return $return; + } + + my $f = $slots->[$idx] = Future->call( $code, $item ); + + if( $collect eq "array" ) { + push @$results, my $r = []; + $f->on_done( sub { @$r = @_ }); + } + elsif( $collect eq "scalar" ) { + push @$results, undef; + my $r = \$results->[-1]; + $f->on_done( sub { $$r = $_[0] }); + } + } + + my $f = $slots->[$idx]; + + # Slot is non-immediate; arrange for us to be invoked again later when it's ready + if( !$f->is_ready ) { + $args[-1] = ( $return ||= $f->new ); + $f->on_done( sub { _fmap_slot( @args ) } ); + $f->on_fail( $return ); + + # Try looking for more that might be ready + my $i = $idx + 1; + while( $i != $idx ) { + $i++; + $i %= @$slots; + next if defined $slots->[$i]; + + $_[1] = $i; + redo SLOT; + } + return $return; + } + + # Either we've been invoked again (case b), or the immediate Future was + # already ready. + if( $f->failure ) { + $return ||= $f->new; + $return->fail( $f->failure ); + return $return; + } + + undef $slots->[$idx]; + # next + } +} + +sub _fmap +{ + my $code = shift; + my %args = @_; + + my $concurrent = $args{concurrent} || 1; + my @slots; + + my $results = []; + my $future = $args{return}; + + my $generator; + if( $generator = $args{generate} ) { + # OK + } + elsif( my $array = $args{foreach} ) { + $generator = sub { return unless @$array; shift @$array }; + } + else { + croak "Expected either 'generate' or 'foreach'"; + } + + # If any of these immediately fail, don't bother continuing + foreach my $idx ( 0 .. $concurrent-1 ) { + $future = _fmap_slot( \@slots, $idx, $code, $generator, $args{collect}, $results, $future ); + last if $future->is_ready; + } + + $future->on_fail( sub { + !defined $_ or $_->is_ready or $_->cancel for @slots; + }); + $future->on_cancel( sub { + $_->cancel for @slots; + }); + + return $future; +} + +=head2 $future = fmap_concat { CODE } ... + +This version of C<fmap> expects each item future to return a list of zero or +more values, and the overall result will be the concatenation of all these +results. It acts like a future-based equivalent to Perl's C<map> operator. + +The results are returned in the order of the original input values, not in the +order their futures complete in. Because of the intermediate storage of +C<ARRAY> references and final flattening operation used to implement this +behaviour, this function is slightly less efficient than C<fmap_scalar> or +C<fmap_void> in cases where item futures are expected only ever to return one, +or zero values, respectively. + +This function is also available under the name of simply C<fmap> to emphasise +its similarity to perl's C<map> keyword. + +=cut + +sub fmap_concat(&@) +{ + my $code = shift; + my %args = @_; + + _fmap( $code, %args, collect => "array" )->then( sub { + return Future->done( map { @$_ } @_ ); + }); +} +*fmap = \&fmap_concat; + +=head2 $future = fmap_scalar { CODE } ... + +This version of C<fmap> acts more like the C<map> functions found in Scheme or +Haskell; it expects that each item future returns only one value, and the +overall result will be a list containing these, in order of the original input +items. If an item future returns more than one value the others will be +discarded. If it returns no value, then C<undef> will be substituted in its +place so that the result list remains in correspondence with the input list. + +This function is also available under the shorter name of C<fmap1>. + +=cut + +sub fmap_scalar(&@) +{ + my $code = shift; + my %args = @_; + + _fmap( $code, %args, collect => "scalar" ) +} +*fmap1 = \&fmap_scalar; + +=head2 $future = fmap_void { CODE } ... + +This version of C<fmap> does not collect any results from its item futures, it +simply waits for them all to complete. Its result future will provide no +values. + +While not a map in the strictest sense, this variant is still useful as a way +to control concurrency of a function call iterating over a list of items, +obtaining its results by some other means (such as side-effects on captured +variables, or some external system). + +This function is also available under the shorter name of C<fmap0>. + +=cut + +sub fmap_void(&@) +{ + my $code = shift; + my %args = @_; + + _fmap( $code, %args, collect => "void" ) +} +*fmap0 = \&fmap_void; + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |