#!/usr/bin/env PYTHONHASHSEED=1234 python3 # Copyright 2014-2019 Brett Slatkin, Pearson Education Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Reproduce book environment import random random.seed(1234) import logging from pprint import pprint from sys import stdout as STDOUT # Write all output to a temporary directory import atexit import gc import io import os import tempfile TEST_DIR = tempfile.TemporaryDirectory() atexit.register(TEST_DIR.cleanup) # Make sure Windows processes exit cleanly OLD_CWD = os.getcwd() atexit.register(lambda: os.chdir(OLD_CWD)) os.chdir(TEST_DIR.name) def close_open_files(): everything = gc.get_objects() for obj in everything: if isinstance(obj, io.IOBase): obj.close() atexit.register(close_open_files) # Example 1 class InputData: def read(self): raise NotImplementedError # Example 2 class PathInputData(InputData): def __init__(self, path): super().__init__() self.path = path def read(self): with open(self.path) as f: return f.read() # Example 3 class Worker: def __init__(self, input_data): self.input_data = input_data self.result = None def map(self): raise NotImplementedError def reduce(self, other): raise NotImplementedError # Example 4 class LineCountWorker(Worker): def map(self): data = self.input_data.read() self.result = data.count('\n') def reduce(self, other): self.result += other.result # Example 5 import os def generate_inputs(data_dir): for name in os.listdir(data_dir): yield PathInputData(os.path.join(data_dir, name)) # Example 6 def create_workers(input_list): workers = [] for input_data in input_list: workers.append(LineCountWorker(input_data)) return workers # Example 7 from threading import Thread def execute(workers): threads = [Thread(target=w.map) for w in workers] for thread in threads: thread.start() for thread in threads: thread.join() first, *rest = workers for worker in rest: first.reduce(worker) return first.result # Example 8 def mapreduce(data_dir): inputs = generate_inputs(data_dir) workers = create_workers(inputs) return execute(workers) # Example 9 import os import random def write_test_files(tmpdir): os.makedirs(tmpdir) for i in range(100): with open(os.path.join(tmpdir, str(i)), 'w') as f: f.write('\n' * random.randint(0, 100)) tmpdir = 'test_inputs' write_test_files(tmpdir) result = mapreduce(tmpdir) print(f'There are {result} lines') # Example 10 class GenericInputData: def read(self): raise NotImplementedError @classmethod def generate_inputs(cls, config): raise NotImplementedError # Example 11 class PathInputData(GenericInputData): def __init__(self, path): super().__init__() self.path = path def read(self): with open(self.path) as f: return f.read() @classmethod def generate_inputs(cls, config): data_dir = config['data_dir'] for name in os.listdir(data_dir): yield cls(os.path.join(data_dir, name)) # Example 12 class GenericWorker: def __init__(self, input_data): self.input_data = input_data self.result = None def map(self): raise NotImplementedError def reduce(self, other): raise NotImplementedError @classmethod def create_workers(cls, input_class, config): workers = [] for input_data in input_class.generate_inputs(config): workers.append(cls(input_data)) return workers # Example 13 class LineCountWorker(GenericWorker): def map(self): data = self.input_data.read() self.result = data.count('\n') def reduce(self, other): self.result += other.result # Example 14 def mapreduce(worker_class, input_class, config): workers = worker_class.create_workers(input_class, config) return execute(workers) # Example 15 config = {'data_dir': tmpdir} result = mapreduce(LineCountWorker, PathInputData, config) print(f'There are {result} lines')