package MapReduce; =head1 NAME MapReduce - Express MapReduce programs (single-threaded run-time) =head1 SYNOPSIS use MapReduce; # Map function that takes a line, splits it into words, and emits pairs # of the form (word, 1). my $mapFn = sub { my ($filename, $line) = @_; my @words; my $word; @words = split(' ', $line); foreach $word (@words) { &MapReduce::Emit($word, 1); } }; # Reduce function that takes lists of the form (word, [ c0, c1, # ..., cN) and prints word: c0 + c1 + ... cN my $reduceFn = sub { my ($key, $counts) = @_; my $total = 0; my $cur; foreach $cur (@$counts) { $total += $cur; } print "$key: $total occurrences.\n"; }; # Run MapReduce with the sample map and reduce handlers. &MapReduce::MapReduce($mapFn, $reduceFn); =head1 DESCRIPTION C is a perl module that implements an extremely simple version of the MapReduce pattern / run-time / what-have-you. A program specifies its map and reduce handlers and calls MapReduce::MapReduce() which applies map to every line in and reduce on any generated pairs. Reduce handlers are expected to explicitly print / write whatever output they wish generated. Note: The implementation is single-threaded and naive/straightforward in all ways. This is meant for convenience and messing around rather than power. For example, the word count example above can become: perl -w -MMapReduce -e 'MapReduce::MapReduce(sub { foreach $i \ (split(" ", $_[1])) { MapReduce::Emit($i, 1); } }, sub { my $t; \ foreach $c (@{$_[1]}) { $t += $c} print "$_[0]: $t\n" })' =cut use strict; my %MRPairs; #### # Emit -- # # Stores the specified (key, value) generated during Map. sub Emit { my ($key, $value) = @_; my $bucket = $MRPairs{$key}; if (not defined $MRPairs{$key}) { $MRPairs{$key} = []; } push @{$MRPairs{$key}}, $value; #print "Emitted ($key, $value), $#{$MRPairs{$key}} total.\n"; } #### # MapReduce -- # # Run the specified map and reduction functions on all of the input. sub MapReduce { my ($mapFn, $reduceFn) = @_; my $filename; my $key; $filename = defined $ARGV[0] ? $ARGV[0] : "stdin"; for (; ; $filename = $ARGV[0] if eof(ARGV)) { chop($_); &$mapFn($filename, $_); } foreach $key (sort keys %MRPairs) { &$reduceFn($key, $MRPairs{$key}); } } #### # WordCount -- # # MapReduce toy example that how often each word occurs in its input. sub WordCount { my $mapFn = sub { my ($filename, $line) = @_; my @words; my $word; @words = split(' ', $line); foreach $word (@words) { &MapReduce::Emit($word, 1); } }; my $reduceFn = sub { my ($key, $counts) = @_; my $total = 0; my $cur; foreach $cur (@$counts) { $total += $cur; } print "$key: $total occurrences.\n"; }; &MapReduce::MapReduce($mapFn, $reduceFn); } 1;