import sys class Mapper: """Reads input tuples to mapper and calls dispatch function for each tuple. The dispatched value may be the empty string.""" def dispatch(self, key, value): """Override this to add new behavior to your mapper, print output that you want returned to Hadoop to sys.stdout""" print >>sys.stderr, "%s: %s" % (key, value) def process(self, input): "Read lines from stream input" for line in input: line = line.strip() if len(line) > 0: try: (key, value) = line.split("\t") except ValueError: key = line value = "" self.dispatch(key, value) class Reducer: """Reads input tuples to reducer and calls dispatch function for each key, grouping values for that key into an array. Keys with no value (i.e., lines with no tab or a tab followed by a newline) will still be dispatched, but empty values will not be included in the values array. This means that your dispatch method may be called with an empty values array.""" def dispatch(self, key, values): """Override this to add new behavior to your reducer, print output that you want returned to Hadoop to sys.stdout""" print >>sys.stderr, "%s: %s" % (key, ", ".join(values)) def process(self, input): "Read lines from stream input" last_key = None values = [] for line in input: line = line.strip() if len(line) > 0: try: (key, value) = map(str.strip, line.split("\t")) except ValueError: key = line value = "" if(last_key != None and last_key != key): self.dispatch(last_key, values) values = [] last_key = key if len(value) > 0: values.append(value) if(len(values) > 0): self.dispatch(key, values)