Bases: Runner
Class for orchestrating the running of the simulation. An automaton simulation
is run by applying the update rule (as implemented by a Controller) to sites
in the SimulationState repeatedly. This class implements the synchronous update
strategy.
In the normal configuration, one simulation step involves applying
the update rule to every site in the SimulationState. The user should assume
these applications follow no specific order. This is appropriate if the
update rule only impacts the cell it is applied to, and no neighboring cells.
If normal mode is used, the automaton can be parallelized, meaning that the
update rule can be applied to many cells simultaneously. This can lead to
dramatic improvements in speed for the simulation. Specify this using
parallel = True
during initialization. You can further specify the number
of workers to use during parallel processing with the workers
parameter.
If left unspecified, one worker for each CPU will be created.
Source code in pylattica/core/runner/synchronous_runner.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 | class SynchronousRunner(Runner):
"""Class for orchestrating the running of the simulation. An automaton simulation
is run by applying the update rule (as implemented by a Controller) to sites
in the SimulationState repeatedly. This class implements the synchronous update
strategy.
In the normal configuration, one simulation step involves applying
the update rule to every site in the SimulationState. The user should assume
these applications follow no specific order. This is appropriate if the
update rule only impacts the cell it is applied to, and no neighboring cells.
If normal mode is used, the automaton can be parallelized, meaning that the
update rule can be applied to many cells simultaneously. This can lead to
dramatic improvements in speed for the simulation. Specify this using
`parallel = True` during initialization. You can further specify the number
of workers to use during parallel processing with the `workers` parameter.
If left unspecified, one worker for each CPU will be created.
"""
def __init__(self, parallel: bool = False, workers: int = None) -> None:
self.parallel = parallel
self.workers = workers
def _run(
self,
initial_state: SimulationState,
result: SimulationResult,
live_state: SimulationState,
controller: BasicController,
num_steps: int,
verbose: bool = False,
):
if self.parallel:
global mp_globals # pylint: disable=global-variable-not-assigned
mp_globals["controller"] = controller
mp_globals["initial_state"] = initial_state
if self.workers is None:
PROCESSES = mp.cpu_count()
else:
PROCESSES = self.workers # pragma: no cover
printif(verbose, f"Running in parallel using {PROCESSES} workers")
num_sites = initial_state.size
chunk_size = math.ceil(num_sites / PROCESSES)
printif(
verbose,
f"Distributing {num_sites} update tasks to {PROCESSES} workers in chunks of {chunk_size}",
)
with mp.get_context("fork").Pool(PROCESSES) as pool:
updates = {}
for _ in tqdm(range(num_steps)):
updates = self._take_step_parallel(
updates, pool, chunk_size=chunk_size
)
result.add_step(updates)
else:
printif(verbose, "Running in series.")
for _ in tqdm(range(num_steps)):
updates = self._take_step(live_state, controller)
live_state.batch_update(updates)
result.add_step(updates)
result.set_output(live_state)
return result
def _take_step_parallel(self, updates: dict, pool, chunk_size) -> SimulationState:
params = []
site_ids = mp_globals["initial_state"].site_ids()
num_sites = len(site_ids)
site_batches = [
site_ids[i : i + chunk_size] for i in range(0, num_sites, chunk_size)
]
for batch in site_batches:
params.append([batch, updates])
results = pool.starmap(_step_batch_parallel, params)
all_updates = None
for batch_update_res in results:
all_updates = merge_updates(all_updates, batch_update_res)
return all_updates
def _take_step(
self, state: SimulationState, controller: BasicController
) -> SimulationState:
site_ids = state.site_ids()
updates = _step_batch(site_ids, state, controller)
return updates
|