Helios Engine 0.1.0
A modular ECS based data-oriented C++23 game engine
 
Loading...
Searching...
No Matches
scheduler.cpp
Go to the documentation of this file.
2
16
17#include <algorithm>
18#include <cstddef>
19#include <span>
20#include <string>
21#include <unordered_map>
22#include <utility>
23#include <vector>
24
25namespace helios::app::details {
26
27void ScheduleExecutor::Execute(ecs::World& world, async::Executor& executor, std::span<SystemStorage> system_storage) {
28 HELIOS_ASSERT(graph_built_, "Failed to execute: Execution graph must be built before executing schedule!");
29
30 if (IsMainStage()) {
31 // Main stage: Execute systems sequentially on the main thread
32 // Systems run in dependency order for immediate visibility
33 for (const size_t index : system_indices_) {
34 auto& storage = system_storage[index];
35 app::SystemContext ctx(world, storage.info, executor, storage.local_storage);
36 storage.system->Update(ctx);
37 ++storage.info.execution_count;
38
39 // Merge events after each system in main schedule for immediate visibility
40 if (!storage.local_storage.GetEventQueue().Empty()) {
41 world.MergeEventQueue(storage.local_storage.GetEventQueue());
42 storage.local_storage.GetEventQueue().Clear();
43 }
44 }
45 } else {
46 // All other stages: Execute the graph asynchronously via executor
47 executor.Run(execution_graph_).Wait();
48
49 // After async execution, merge all local event queues into world's main queue
50 for (const size_t index : system_indices_) {
51 auto& storage = system_storage[index];
52 if (!storage.local_storage.GetEventQueue().Empty()) {
53 world.MergeEventQueue(storage.local_storage.GetEventQueue());
54 storage.local_storage.GetEventQueue().Clear();
55 }
56 }
57 }
58}
59
61 schedule_order_.clear();
62
63 const auto all_schedules = CollectAllScheduleIds();
64 if (all_schedules.empty()) {
65 return;
66 }
67
68 auto [adjacency, in_degree] = BuildScheduleDependencyGraph(all_schedules);
69 schedule_order_ = TopologicalSort(all_schedules, adjacency, in_degree);
70
71 // Build execution graphs for all schedules
72 for (auto& [_, schedule] : schedules_) {
73 schedule.BuildExecutionGraph(world, system_storage_, system_sets_);
74 }
75}
76
77auto Scheduler::CollectAllScheduleIds() const -> std::vector<ScheduleId> {
78 std::vector<ScheduleId> all_schedules;
79 all_schedules.reserve(schedules_.size());
80 for (const auto& [id, schedule] : schedules_) {
81 all_schedules.push_back(id);
82 }
83 return all_schedules;
84}
85
86auto Scheduler::BuildScheduleDependencyGraph(const std::vector<ScheduleId>& all_schedules) const
87 -> std::pair<std::unordered_map<ScheduleId, std::vector<ScheduleId>>, std::unordered_map<ScheduleId, size_t>> {
88 std::unordered_map<ScheduleId, std::vector<ScheduleId>> adjacency;
89 std::unordered_map<ScheduleId, size_t> in_degree;
90
91 // Initialize
92 for (const ScheduleId id : all_schedules) {
93 adjacency[id] = {};
94 in_degree[id] = 0;
95 }
96
97 // Build the dependency graph from Before/After constraints
98 for (const auto& [schedule_id, ordering] : schedule_constraints_) {
99 // "Before" constraints: this schedule must run before others
100 for (const ScheduleId before_id : ordering.before) {
101 if (schedules_.contains(before_id)) {
102 adjacency[schedule_id].push_back(before_id);
103 ++in_degree[before_id];
104 }
105 }
106
107 // "After" constraints: this schedule must run after others
108 for (const ScheduleId after_id : ordering.after) {
109 if (schedules_.contains(after_id)) {
110 adjacency[after_id].push_back(schedule_id);
111 ++in_degree[schedule_id];
112 }
113 }
114 }
115
116 return {adjacency, in_degree};
117}
118
119auto Scheduler::TopologicalSort(const std::vector<ScheduleId>& all_schedules,
120 const std::unordered_map<ScheduleId, std::vector<ScheduleId>>& adjacency,
121 std::unordered_map<ScheduleId, size_t>& in_degree) -> std::vector<ScheduleId> {
122 std::vector<ScheduleId> result;
123 result.reserve(all_schedules.size());
124
125 // Kahn's algorithm: collect nodes with no incoming edges
126 std::vector<ScheduleId> queue;
127 for (const ScheduleId id : all_schedules) {
128 if (in_degree[id] == 0) {
129 queue.push_back(id);
130 }
131 }
132
133 // Process nodes in topological order
134 while (!queue.empty()) {
135 const ScheduleId current = queue.back();
136 queue.pop_back();
137 result.push_back(current);
138
139 // Reduce in-degree for all dependent schedules
140 for (const ScheduleId dependent : adjacency.at(current)) {
141 --in_degree[dependent];
142 if (in_degree[dependent] == 0) {
143 queue.push_back(dependent);
144 }
145 }
146 }
147
148 // Check for cycles
149 if (result.size() != all_schedules.size()) {
150 HELIOS_ERROR("Schedule dependency cycle detected! Some schedules will not execute.");
151 // Add remaining schedules anyway to avoid crashes
152 for (const ScheduleId id : all_schedules) {
153 if (std::ranges::find(result, id) == result.end()) {
154 result.push_back(id);
155 }
156 }
157 }
158
159 return result;
160}
161
162void ScheduleExecutor::BuildExecutionGraph(ecs::World& world, std::span<SystemStorage> system_storage,
163 const std::unordered_map<SystemSetId, SystemSetInfo>& system_sets) {
164 if (system_indices_.empty()) {
165 graph_built_ = true;
166 return;
167 }
168
169 execution_graph_.Clear();
170
171 auto [tasks, system_id_to_task_index] = CreateSystemTasks(system_storage, world);
172 ApplyExplicitOrdering(tasks, system_id_to_task_index, system_storage);
173 ApplySetOrdering(tasks, system_id_to_task_index, system_sets);
174 ApplyAccessPolicyOrdering(tasks, system_storage);
175
176 graph_built_ = true;
177}
178
179auto ScheduleExecutor::CreateSystemTasks(std::span<SystemStorage> system_storage, ecs::World& world)
180 -> std::pair<std::vector<async::Task>, std::unordered_map<ecs::SystemTypeId, size_t>> {
181 std::vector<async::Task> tasks;
182 tasks.reserve(system_indices_.size());
183
184 std::unordered_map<ecs::SystemTypeId, size_t> system_id_to_task_index;
185 system_id_to_task_index.reserve(system_indices_.size());
186
187 for (size_t i = 0; i < system_indices_.size(); ++i) {
188 const size_t storage_index = system_indices_[i];
189
190 // Capture pointer to SystemStorage instead of reference to span to avoid dangling reference
191 auto* storage_ptr = &system_storage[storage_index];
192 auto task = execution_graph_.EmplaceTask([storage_ptr, &world](async::SubTaskGraph& sub_graph) {
193 auto& storage = *storage_ptr;
194 app::SystemContext ctx(world, storage.info, sub_graph, storage.local_storage);
195 storage.system->Update(ctx);
196 ++storage.info.execution_count;
197 });
198
199 task.Name(storage_ptr->info.name);
200 tasks.push_back(task);
201 system_id_to_task_index[storage_ptr->info.type_id] = i;
202 }
203
204 return {tasks, system_id_to_task_index};
205}
206
207void ScheduleExecutor::ApplyExplicitOrdering(
208 std::vector<async::Task>& tasks, const std::unordered_map<ecs::SystemTypeId, size_t>& system_id_to_task_index,
209 std::span<SystemStorage> system_storage) {
210 for (const auto& [system_id, ordering] : system_orderings_) {
211 const auto system_it = system_id_to_task_index.find(system_id);
212 if (system_it == system_id_to_task_index.end()) {
213 HELIOS_WARN("Scheduling: System with type ID '{}' has ordering constraints but is not in this schedule!",
214 system_id);
215 continue;
216 }
217
218 const size_t system_idx = system_it->second;
219 const size_t storage_index = system_indices_[system_idx];
220 const auto& storage = system_storage[storage_index];
221
222 // Handle "before" constraints
223 for (const auto& before_id : ordering.before) {
224 const auto before_it = system_id_to_task_index.find(before_id);
225 if (before_it != system_id_to_task_index.end()) {
226 const size_t before_idx = before_it->second;
227 [[maybe_unused]] const size_t before_storage_index = system_indices_[before_idx];
228 tasks[system_idx].Precede(tasks[before_idx]);
229
230 HELIOS_TRACE("Scheduling: System '{}' explicitly ordered before '{}'", storage.info.name,
231 system_storage[before_storage_index].info.name);
232 } else {
233 HELIOS_WARN("Scheduling: System '{}' has before constraint on non-existent system with type ID '{}'!",
234 storage.info.name, before_id);
235 }
236 }
237
238 // Handle "after" constraints
239 for (const auto& after_id : ordering.after) {
240 const auto after_it = system_id_to_task_index.find(after_id);
241 if (after_it != system_id_to_task_index.end()) {
242 const size_t after_idx = after_it->second;
243 [[maybe_unused]] const size_t after_storage_index = system_indices_[after_idx];
244 tasks[after_idx].Precede(tasks[system_idx]);
245
246 HELIOS_TRACE("Scheduling: System '{}' explicitly ordered after '{}'", storage.info.name,
247 system_storage[after_storage_index].info.name);
248 } else {
249 HELIOS_WARN("Scheduling: System '{}' has after constraint on non-existent system with type ID '{}'!",
250 storage.info.name, after_id);
251 }
252 }
253 }
254}
255
256void ScheduleExecutor::ApplySetOrdering(std::vector<async::Task>& tasks,
257 const std::unordered_map<ecs::SystemTypeId, size_t>& system_id_to_task_index,
258 const std::unordered_map<SystemSetId, SystemSetInfo>& system_sets) {
259 if (system_sets.empty()) [[unlikely]] {
260 return;
261 }
262
263 for (const auto& [set_id, set_info] : system_sets) {
264 const auto& members = set_info.member_systems;
265 if (members.empty()) {
266 continue;
267 }
268
269 for (const SystemSetId after_set_id : set_info.before_sets) {
270 const auto after_it = system_sets.find(after_set_id);
271 if (after_it == system_sets.end()) {
272 continue;
273 }
274
275 const auto& after_members = after_it->second.member_systems;
276 if (after_members.empty()) {
277 continue;
278 }
279
280 // For every pair (a, b) where a ∈ set, b ∈ after_set and both are in this schedule: a -> b
281 for (const ecs::SystemTypeId before_system_id : members) {
282 const auto before_idx_it = system_id_to_task_index.find(before_system_id);
283 if (before_idx_it == system_id_to_task_index.end()) {
284 continue;
285 }
286
287 for (const ecs::SystemTypeId after_system_id : after_members) {
288 const auto after_idx_it = system_id_to_task_index.find(after_system_id);
289 if (after_idx_it == system_id_to_task_index.end()) {
290 continue;
291 }
292
293 const size_t before_local_idx = before_idx_it->second;
294 const size_t after_local_idx = after_idx_it->second;
295
296 // Update explicit ordering map so set-level constraints become explicit edges
297 auto& ordering = system_orderings_[before_system_id];
298 if (std::ranges::find(ordering.before, after_system_id) == ordering.before.end()) {
299 ordering.before.push_back(after_system_id);
300 }
301
302 tasks[before_local_idx].Precede(tasks[after_local_idx]);
303 }
304 }
305 }
306 }
307}
308
309void ScheduleExecutor::ApplyAccessPolicyOrdering(std::vector<async::Task>& tasks,
310 std::span<SystemStorage> system_storage) {
311 // Build dependencies based on access policy conflicts
312 for (size_t i = 0; i < system_indices_.size(); ++i) {
313 for (size_t j = i + 1; j < system_indices_.size(); ++j) {
314 const size_t storage_index_i = system_indices_[i];
315 const size_t storage_index_j = system_indices_[j];
316 const auto& storage_i = system_storage[storage_index_i];
317 const auto& storage_j = system_storage[storage_index_j];
318
319 const bool query_conflict = storage_i.info.access_policy.HasQueryConflict(storage_j.info.access_policy);
320 const bool resource_conflict = storage_i.info.access_policy.HasResourceConflict(storage_j.info.access_policy);
321
322 if (query_conflict || resource_conflict) {
323 tasks[i].Precede(tasks[j]);
324
325#ifdef HELIOS_DEBUG_MODE
326 if (query_conflict) {
327 const auto component_conflicts =
328 SystemDiagnostics::AnalyzeComponentConflicts(storage_i.info.access_policy, storage_j.info.access_policy);
329 const std::string conflict_details = SystemDiagnostics::FormatComponentConflicts(
330 storage_i.info.name, storage_j.info.name, component_conflicts);
331 HELIOS_TRACE("Scheduling: System '{}' must run before '{}' due to component conflicts:\n{}",
332 storage_i.info.name, storage_j.info.name, conflict_details);
333 }
334
335 if (resource_conflict) {
336 const auto resource_conflicts =
337 SystemDiagnostics::AnalyzeResourceConflicts(storage_i.info.access_policy, storage_j.info.access_policy);
338 const std::string conflict_details =
339 SystemDiagnostics::FormatResourceConflicts(storage_i.info.name, storage_j.info.name, resource_conflicts);
340 HELIOS_TRACE("Scheduling: System '{}' must run before '{}' due to resource conflicts:\n{}",
341 storage_i.info.name, storage_j.info.name, conflict_details);
342 }
343#else
344 HELIOS_TRACE("Scheduling: System '{}' must run before '{}' due to data conflict", storage_i.info.name,
345 storage_j.info.name);
346#endif
347 }
348 }
349 }
350
351 [[maybe_unused]] const size_t system_count = system_indices_.size();
352 HELIOS_DEBUG("Built execution graph with {} {}", system_count, system_count == 1 ? "system" : "systems");
353}
354
355} // namespace helios::app::details
#define HELIOS_ASSERT(condition,...)
Assertion macro that aborts execution in debug builds.
Definition assert.hpp:140
Per-system execution context with validated access.
bool IsMainStage() const noexcept
Checks if this schedule is the Main stage.
void BuildExecutionGraph(ecs::World &world, std::span< SystemStorage > system_storage, const std::unordered_map< SystemSetId, SystemSetInfo > &system_sets)
Builds the execution graph based on system access policies.
void Execute(ecs::World &world, async::Executor &executor, std::span< SystemStorage > system_storage)
Executes all systems in this schedule.
Definition scheduler.cpp:27
void BuildAllGraphs(ecs::World &world)
Builds execution graphs for all schedules.
Definition scheduler.cpp:60
static auto AnalyzeResourceConflicts(const AccessPolicy &policy_a, const AccessPolicy &policy_b) -> std::vector< ResourceConflict >
Analyzes resource conflicts between two access policies.
static std::string FormatResourceConflicts(std::string_view system_a_name, std::string_view system_b_name, std::span< const ResourceConflict > conflicts)
Formats resource conflict information into human-readable string.
static auto AnalyzeComponentConflicts(const AccessPolicy &policy_a, const AccessPolicy &policy_b) -> std::vector< ComponentConflict >
Analyzes conflicts between two access policies.
static std::string FormatComponentConflicts(std::string_view system_a_name, std::string_view system_b_name, std::span< const ComponentConflict > conflicts)
Formats component conflict information into human-readable string.
Manages worker threads and executes task graphs using work-stealing scheduling.
Definition executor.hpp:33
auto Run(TaskGraph &graph) -> Future< void >
Runs a task graph once.
Definition executor.hpp:57
Dynamic task graph that can be created within the execution of a task.
The World class manages entities with their components and systems.
Definition world.hpp:53
void MergeEventQueue(details::EventQueue &other)
Merges events from another event queue into the main queue.
Definition world.hpp:95
#define HELIOS_DEBUG(...)
Definition logger.hpp:667
#define HELIOS_ERROR(...)
Definition logger.hpp:689
#define HELIOS_TRACE(...)
Definition logger.hpp:680
#define HELIOS_WARN(...)
Definition logger.hpp:687
size_t ScheduleId
Type alias for schedule type IDs.
Definition schedule.hpp:20
size_t SystemSetId
Type alias for system set type IDs.
size_t SystemTypeId
Type ID for systems.
Definition system.hpp:93
STL namespace.