Skip to content

AsynchronousRunner

AsynchronousRunner

Bases: Runner

Class for orchestrating the running of the simulation. An automaton simulation is run by applying the update rule (as implemented by a Controller) to sites in the SimulationState repeatedly. This Runner implements the Asynchronous evolution strategy.

If a simulation is run asynchronously, one simulation step consists of a single site being chosen randomly and applying the update rule there. This is appropriate if the update rule impacts the cell it is focused on and it's neighbors.

For instance, if the update rule "moves" an entity from one cell to a neighboring cell, it must be applied asynchronously because otherwise it's effects could interfere with the effects of neighboring applications. Specify that this mode should be used with the is_async initialization parameter.

Source code in pylattica/core/runner/asynchronous_runner.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class AsynchronousRunner(Runner):
    """Class for orchestrating the running of the simulation. An automaton simulation
    is run by applying the update rule (as implemented by a Controller) to sites
    in the SimulationState repeatedly. This Runner implements the Asynchronous evolution
    strategy.

    If a simulation is run asynchronously, one simulation step consists
    of a single site being chosen randomly and applying the update rule there. This
    is appropriate if the update rule impacts the cell it is focused on and it's
    neighbors.

    For instance, if the update rule "moves" an entity from one cell to
    a neighboring cell, it must be applied asynchronously because otherwise it's
    effects could interfere with the effects of neighboring applications. Specify
    that this mode should be used with the is_async initialization parameter.
    """

    def _run(
        self,
        _: SimulationState,
        result: SimulationResult,
        live_state: SimulationState,
        controller: BasicController,
        num_steps: int,
        verbose: bool = False,
    ) -> SimulationResult:
        """Run the simulation for the prescribed number of steps. Recall that one
        asynchronous simulation step involves one application of the update rule,
        and one normal simulation step applies the update rule to every site.

        Parameters
        ----------
        initial_state : SimulationState
            The starting state for the simulation.
        controller : BasicController
            The controller (a descendent of BasicController) which implements the update rule.
        num_steps : int
            The number of steps for which the simulation should run.
        verbose : bool, optional
            If True, debug information is printed during the run, by default False

        Returns
        -------
        BasicSimulationResult
            The result of the simulation.
        """

        site_queue = deque()

        def _add_sites_to_queue():
            next_site = controller.get_random_site(live_state)
            if isinstance(next_site, int):
                site_queue.append(next_site)
            elif isinstance(next_site, list):
                site_queue.extend(next_site)

        _add_sites_to_queue()

        if len(site_queue) == 0:
            raise RuntimeError("Controller provided no sites to update, ABORTING")

        for _ in tqdm(range(num_steps), disable=(not verbose)):
            site_id = site_queue.popleft()

            controller_response = controller.get_state_update(site_id, live_state)
            next_sites = []

            # See if controller is specifying which sites to visit next
            if isinstance(controller_response, tuple):
                state_updates, next_sites = controller_response
            else:
                state_updates = controller_response

            state_updates = merge_updates(state_updates, site_id=site_id)
            live_state.batch_update(state_updates)
            site_queue.extend(next_sites)
            result.add_step(state_updates)

            if len(site_queue) == 0:
                _add_sites_to_queue()

            if len(site_queue) == 0:
                break

        result.set_output(live_state)
        return result