#!/usr/bin/env PYTHONHASHSEED=1234 python3 # Copyright 2014-2024 Brett Slatkin, Pearson Education Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ### Start book environment setup import random random.seed(1234) import logging from pprint import pprint from sys import stdout as STDOUT # Write all output to a temporary directory import atexit import gc import io import os import tempfile TEST_DIR = tempfile.TemporaryDirectory() atexit.register(TEST_DIR.cleanup) # Make sure Windows processes exit cleanly OLD_CWD = os.getcwd() atexit.register(lambda: os.chdir(OLD_CWD)) os.chdir(TEST_DIR.name) def close_open_files(): everything = gc.get_objects() for obj in everything: if isinstance(obj, io.IOBase): obj.close() atexit.register(close_open_files) ### End book environment setup print("Example 1") from queue import Queue in_queue = Queue() out_queue = Queue() print("Example 2") from threading import Thread from queue import ShutDown class StoppableWorker(Thread): def __init__(self, func, in_queue, out_queue, *args, **kwargs): super().__init__(*args, **kwargs) self.func = func self.in_queue = in_queue self.out_queue = out_queue def run(self): while True: try: item = self.in_queue.get() except ShutDown: return else: result = self.func(item) self.out_queue.put(result) self.in_queue.task_done() def game_logic(state, neighbors): # Do some blocking input/output in here: data = my_socket.recv(100) def game_logic(state, neighbors): if state == ALIVE: if neighbors < 2: return EMPTY elif neighbors > 3: return EMPTY else: if neighbors == 3: return ALIVE return state def game_logic_thread(item): y, x, state, neighbors = item try: next_state = game_logic(state, neighbors) except Exception as e: next_state = e return (y, x, next_state) # Start the threads upfront threads = [] for _ in range(5): thread = StoppableWorker(game_logic_thread, in_queue, out_queue) thread.start() threads.append(thread) print("Example 3") ALIVE = "*" EMPTY = "-" class SimulationError(Exception): pass class Grid: def __init__(self, height, width): self.height = height self.width = width self.rows = [] for _ in range(self.height): self.rows.append([EMPTY] * self.width) def get(self, y, x): return self.rows[y % self.height][x % self.width] def set(self, y, x, state): self.rows[y % self.height][x % self.width] = state def __str__(self): output = "" for row in self.rows: for cell in row: output += cell output += "\n" return output def count_neighbors(y, x, get_cell): n_ = get_cell(y - 1, x + 0) # North ne = get_cell(y - 1, x + 1) # Northeast e_ = get_cell(y + 0, x + 1) # East se = get_cell(y + 1, x + 1) # Southeast s_ = get_cell(y + 1, x + 0) # South sw = get_cell(y + 1, x - 1) # Southwest w_ = get_cell(y + 0, x - 1) # West nw = get_cell(y - 1, x - 1) # Northwest neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw] count = 0 for state in neighbor_states: if state == ALIVE: count += 1 return count def simulate_pipeline(grid, in_queue, out_queue): for y in range(grid.height): for x in range(grid.width): state = grid.get(y, x) neighbors = count_neighbors(y, x, grid.get) in_queue.put((y, x, state, neighbors)) # Fan-out in_queue.join() item_count = out_queue.qsize() next_grid = Grid(grid.height, grid.width) for _ in range(item_count): item = out_queue.get() # Fan-in y, x, next_state = item if isinstance(next_state, Exception): raise SimulationError(y, x) from next_state next_grid.set(y, x, next_state) return next_grid print("Example 4") try: def game_logic(state, neighbors): raise OSError("Problem with I/O in game_logic") simulate_pipeline(Grid(1, 1), in_queue, out_queue) except: logging.exception('Expected') else: assert False print("Example 5") # Restore the working version of this function def game_logic(state, neighbors): if state == ALIVE: if neighbors < 2: return EMPTY elif neighbors > 3: return EMPTY else: if neighbors == 3: return ALIVE return state class ColumnPrinter: def __init__(self): self.columns = [] def append(self, data): self.columns.append(data) def __str__(self): row_count = 1 for data in self.columns: row_count = max(row_count, len(data.splitlines()) + 1) rows = [""] * row_count for j in range(row_count): for i, data in enumerate(self.columns): line = data.splitlines()[max(0, j - 1)] if j == 0: padding = " " * (len(line) // 2) rows[j] += padding + str(i) + padding else: rows[j] += line if (i + 1) < len(self.columns): rows[j] += " | " return "\n".join(rows) grid = Grid(5, 9) grid.set(0, 3, ALIVE) grid.set(1, 4, ALIVE) grid.set(2, 2, ALIVE) grid.set(2, 3, ALIVE) grid.set(2, 4, ALIVE) columns = ColumnPrinter() for i in range(5): columns.append(str(grid)) grid = simulate_pipeline(grid, in_queue, out_queue) print(columns) in_queue.shutdown() in_queue.join() for thread in threads: thread.join() print("Example 6") def count_neighbors(y, x, get_cell): # Do some blocking input/output in here: data = my_socket.recv(100) print("Example 7") def count_neighbors(y, x, get_cell): n_ = get_cell(y - 1, x + 0) # North ne = get_cell(y - 1, x + 1) # Northeast e_ = get_cell(y + 0, x + 1) # East se = get_cell(y + 1, x + 1) # Southeast s_ = get_cell(y + 1, x + 0) # South sw = get_cell(y + 1, x - 1) # Southwest w_ = get_cell(y + 0, x - 1) # West nw = get_cell(y - 1, x - 1) # Northwest neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw] count = 0 for state in neighbor_states: if state == ALIVE: count += 1 return count def count_neighbors_thread(item): y, x, state, get_cell = item try: neighbors = count_neighbors(y, x, get_cell) except Exception as e: neighbors = e return (y, x, state, neighbors) def game_logic_thread(item): y, x, state, neighbors = item if isinstance(neighbors, Exception): next_state = neighbors else: try: next_state = game_logic(state, neighbors) except Exception as e: next_state = e return (y, x, next_state) from threading import Lock class LockingGrid(Grid): def __init__(self, height, width): super().__init__(height, width) self.lock = Lock() def __str__(self): with self.lock: return super().__str__() def get(self, y, x): with self.lock: return super().get(y, x) def set(self, y, x, state): with self.lock: return super().set(y, x, state) print("Example 8") in_queue = Queue() logic_queue = Queue() out_queue = Queue() threads = [] for _ in range(5): thread = StoppableWorker( count_neighbors_thread, in_queue, logic_queue ) thread.start() threads.append(thread) for _ in range(5): thread = StoppableWorker( game_logic_thread, logic_queue, out_queue ) thread.start() threads.append(thread) print("Example 9") def simulate_phased_pipeline(grid, in_queue, logic_queue, out_queue): for y in range(grid.height): for x in range(grid.width): state = grid.get(y, x) item = (y, x, state, grid.get) in_queue.put(item) # Fan-out in_queue.join() logic_queue.join() # Pipeline sequencing item_count = out_queue.qsize() next_grid = LockingGrid(grid.height, grid.width) for _ in range(item_count): y, x, next_state = out_queue.get() # Fan-in if isinstance(next_state, Exception): raise SimulationError(y, x) from next_state next_grid.set(y, x, next_state) return next_grid print("Example 10") grid = LockingGrid(5, 9) grid.set(0, 3, ALIVE) grid.set(1, 4, ALIVE) grid.set(2, 2, ALIVE) grid.set(2, 3, ALIVE) grid.set(2, 4, ALIVE) columns = ColumnPrinter() for i in range(5): columns.append(str(grid)) grid = simulate_phased_pipeline( grid, in_queue, logic_queue, out_queue ) print(columns) in_queue.shutdown() in_queue.join() logic_queue.shutdown() logic_queue.join() for thread in threads: thread.join() print("Example 11") # Make sure exception propagation works as expected def count_neighbors(*args): raise OSError("Problem with I/O in count_neighbors") in_queue = Queue() logic_queue = Queue() out_queue = Queue() threads = [ StoppableWorker( count_neighbors_thread, in_queue, logic_queue, daemon=True ), StoppableWorker( game_logic_thread, logic_queue, out_queue, daemon=True ), ] for thread in threads: thread.start() try: simulate_phased_pipeline(grid, in_queue, logic_queue, out_queue) except SimulationError: pass # Expected else: assert False