Skip to content

SynchronousRunner

SynchronousRunner

Bases: Runner

Class for orchestrating the running of the simulation. An automaton simulation is run by applying the update rule (as implemented by a Controller) to sites in the SimulationState repeatedly. This class implements the synchronous update strategy.

In the normal configuration, one simulation step involves applying the update rule to every site in the SimulationState. The user should assume these applications follow no specific order. This is appropriate if the update rule only impacts the cell it is applied to, and no neighboring cells.

If normal mode is used, the automaton can be parallelized, meaning that the update rule can be applied to many cells simultaneously. This can lead to dramatic improvements in speed for the simulation. Specify this using parallel = True during initialization. You can further specify the number of workers to use during parallel processing with the workers parameter. If left unspecified, one worker for each CPU will be created.

Source code in pylattica/core/runner/synchronous_runner.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class SynchronousRunner(Runner):
    """Class for orchestrating the running of the simulation. An automaton simulation
    is run by applying the update rule (as implemented by a Controller) to sites
    in the SimulationState repeatedly. This class implements the synchronous update
    strategy.

    In the normal configuration, one simulation step involves applying
    the update rule to every site in the SimulationState. The user should assume
    these applications follow no specific order. This is appropriate if the
    update rule only impacts the cell it is applied to, and no neighboring cells.

    If normal mode is used, the automaton can be parallelized, meaning that the
    update rule can be applied to many cells simultaneously. This can lead to
    dramatic improvements in speed for the simulation. Specify this using
    `parallel = True` during initialization. You can further specify the number
    of workers to use during parallel processing with the `workers` parameter.
    If left unspecified, one worker for each CPU will be created.
    """

    def __init__(self, parallel: bool = False, workers: int = None) -> None:
        self.parallel = parallel
        self.workers = workers

    def _run(
        self,
        initial_state: SimulationState,
        result: SimulationResult,
        live_state: SimulationState,
        controller: BasicController,
        num_steps: int,
        verbose: bool = False,
    ):
        if self.parallel:
            global mp_globals  # pylint: disable=global-variable-not-assigned
            mp_globals["controller"] = controller
            mp_globals["initial_state"] = initial_state

            if self.workers is None:
                PROCESSES = mp.cpu_count()
            else:
                PROCESSES = self.workers  # pragma: no cover

            printif(verbose, f"Running in parallel using {PROCESSES} workers")
            num_sites = initial_state.size
            chunk_size = math.ceil(num_sites / PROCESSES)
            printif(
                verbose,
                f"Distributing {num_sites} update tasks to {PROCESSES} workers in chunks of {chunk_size}",
            )
            with mp.get_context("fork").Pool(PROCESSES) as pool:
                updates = {}
                for _ in tqdm(range(num_steps)):
                    updates = self._take_step_parallel(
                        updates, pool, chunk_size=chunk_size
                    )
                    result.add_step(updates)
        else:
            printif(verbose, "Running in series.")
            for _ in tqdm(range(num_steps)):
                updates = self._take_step(live_state, controller)
                live_state.batch_update(updates)
                result.add_step(updates)

        result.set_output(live_state)
        return result

    def _take_step_parallel(self, updates: dict, pool, chunk_size) -> SimulationState:
        params = []
        site_ids = mp_globals["initial_state"].site_ids()
        num_sites = len(site_ids)
        site_batches = [
            site_ids[i : i + chunk_size] for i in range(0, num_sites, chunk_size)
        ]

        for batch in site_batches:
            params.append([batch, updates])

        results = pool.starmap(_step_batch_parallel, params)
        all_updates = None
        for batch_update_res in results:
            all_updates = merge_updates(all_updates, batch_update_res)

        return all_updates

    def _take_step(
        self, state: SimulationState, controller: BasicController
    ) -> SimulationState:
        site_ids = state.site_ids()
        updates = _step_batch(site_ids, state, controller)

        return updates