Helios Engine 0.1.0
A modular ECS based data-oriented C++23 game engine
 
Loading...
Searching...
No Matches
sub_task_graph.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <helios/core_pch.hpp>
4
12
13#include <taskflow/algorithm/for_each.hpp>
14#include <taskflow/algorithm/reduce.hpp>
15#include <taskflow/algorithm/sort.hpp>
16#include <taskflow/algorithm/transform.hpp>
17#include <taskflow/core/async_task.hpp>
18#include <taskflow/taskflow.hpp>
19
20#include <array>
21#include <concepts>
22#include <cstddef>
23#include <functional>
24#include <future>
25#include <ranges>
26#include <string>
27#include <type_traits>
28#include <utility>
29#include <vector>
30
31namespace helios::async {
32
33/**
34 * @brief Dynamic task graph that can be created within the execution of a task.
35 * @details Wraps tf::Subflow and provides methods to create tasks dynamically at runtime.
36 * SubTaskGraphs are spawned from the execution of a SubTask and allow for runtime-dependent task creation and
37 * dependency management.
38 * @note Partially thread-safe.
39 * @warning Only the worker thread that spawned the subflow should modify it.
40 */
42public:
43 SubTaskGraph(const SubTaskGraph&) = delete;
45 ~SubTaskGraph() = default;
46
49
50 /**
51 * @brief Joins the subflow with its parent task.
52 * @details Called automatically when the SubTaskGraph goes out of scope, unless Join() has already been called.
53 * @warning Must be called by the same worker thread that created this subflow.
54 */
55 void Join() { subflow_.join(); }
56
57 /**
58 * @brief Specifies whether to keep the sub task graph after it is joined.
59 * @details By default, a sub task graph is destroyed after being joined. Retaining it allows it to
60 * remain valid after being joined.
61 * @warning Must be called by the same worker thread that created this subflow.
62 * @param flag True to retain, false otherwise
63 */
64 void Retain(bool flag) noexcept { subflow_.retain(flag); }
65
66 /**
67 * @brief Creates a static task with the given callable.
68 * @note Not thread-safe
69 * @tparam C Callable type
70 * @param callable Function to execute when the task runs
71 * @return Task handle for the created task
72 */
73 template <StaticTask C>
74 Task EmplaceTask(C&& callable) {
75 return Task(subflow_.emplace(std::forward<C>(callable)));
76 }
77
78 /**
79 * @brief Creates a dynamic task (nested subflow) with the given callable.
80 * @note Not thread-safe
81 * @tparam C Callable type that accepts a SubTaskGraph reference
82 * @param callable Function to execute when the task runs
83 * @return Task handle for the created task
84 */
85 template <SubTask C>
86 Task EmplaceTask(C&& callable);
87
88 /**
89 * @brief Creates multiple tasks from a list of callables.
90 * @note Not thread-safe
91 * @tparam Cs Callable types
92 * @param callables Functions to execute when the tasks run
93 * @return Array of task handles for the created tasks
94 */
95 template <AnyTask... Cs>
96 requires(sizeof...(Cs) > 1)
97 auto EmplaceTasks(Cs&&... callables) -> std::array<Task, sizeof...(Cs)> {
98 return {EmplaceTask(std::forward<Cs>(callables))...};
99 }
100
101 /**
102 * @brief Creates a placeholder task with no assigned work.
103 * @note Not thread-safe
104 * @return Task handle that can later be assigned work
105 */
106 Task CreatePlaceholder() { return Task(subflow_.placeholder()); }
107
108 /**
109 * @brief Creates linear dependencies between tasks in the given range.
110 * @note Not thread-safe
111 * @tparam R Range type containing Task objects
112 * @param tasks Range of tasks to linearize (first->second->third->...)
113 */
114 template <std::ranges::range R>
115 requires std::same_as<std::ranges::range_value_t<R>, Task>
116 void Linearize(const R& tasks);
117
118 /**
119 * @brief Creates a parallel for-each task over the given range.
120 * @note Not thread-safe
121 * @tparam R Range type
122 * @tparam C Callable type
123 * @param range Input range to iterate over
124 * @param callable Function to apply to each element
125 * @return Task handle for the parallel operation
126 */
127 template <std::ranges::range R, std::invocable<std::ranges::range_reference_t<R>> C>
128 Task ForEach(const R& range, C&& callable) {
129 return Task(subflow_.for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<C>(callable)));
130 }
131
132 /**
133 * @brief Creates a parallel for-each task over an index range.
134 * @note Not thread-safe
135 * @tparam I Integral type
136 * @tparam C Callable type
137 * @param start Starting index (inclusive)
138 * @param end Ending index (exclusive)
139 * @param step Step size
140 * @param callable Function to apply to each index
141 * @return Task handle for the parallel operation
142 */
143 template <std::integral I, std::invocable<I> C>
144 Task ForEachIndex(I start, I end, I step, C&& callable) {
145 return Task(subflow_.for_each_index(start, end, step, std::forward<C>(callable)));
146 }
147
148 /**
149 * @brief Creates a parallel transform task that applies a function to each element.
150 * @note Not thread-safe
151 * @tparam InputRange Input range type
152 * @tparam OutputRange Output range type
153 * @tparam TransformFunc Transform function type
154 * @param input_range Range of input elements
155 * @param output_range Range to store transformed results
156 * @param transform_func Function to apply to each input element
157 * @return Task handle for the parallel operation
158 */
159 template <std::ranges::range InputRange, std::ranges::range OutputRange,
160 std::invocable<std::ranges::range_reference_t<InputRange>> TransformFunc>
161 Task Transform(const InputRange& input_range, OutputRange& output_range, TransformFunc&& transform_func) {
162 return Task(subflow_.transform(std::ranges::begin(input_range), std::ranges::end(input_range),
163 std::ranges::begin(output_range), std::forward<TransformFunc>(transform_func)));
164 }
165
166 /**
167 * @brief Creates a parallel reduction task that combines elements using a binary operation.
168 * @note Not thread-safe
169 * @tparam R Range type
170 * @tparam T Result type
171 * @tparam BinaryOp Binary operation type
172 * @param range Range of elements to reduce
173 * @param init Initial value and storage for the result
174 * @param binary_op Binary function to combine elements
175 * @return Task handle for the parallel operation
176 */
177 template <std::ranges::range R, typename T, typename BinaryOp>
178 requires std::invocable<BinaryOp, T, std::ranges::range_reference_t<R>>
179 Task Reduce(const R& range, T& init, BinaryOp&& binary_op) {
180 return Task(
181 subflow_.reduce(std::ranges::begin(range), std::ranges::end(range), init, std::forward<BinaryOp>(binary_op)));
182 }
183
184 /**
185 * @brief Creates a parallel sort task for the given range.
186 * @note Not thread-safe
187 * @tparam R Random access range type
188 * @tparam Compare Comparator type
189 * @param range Range of elements to sort
190 * @param comparator Comparison function (default: std::less<>)
191 * @return Task handle for the parallel operation
192 */
193 template <std::ranges::random_access_range R, typename Compare = std::less<>>
194 requires std::predicate<Compare, std::ranges::range_reference_t<R>, std::ranges::range_reference_t<R>>
195 Task Sort(R& range, Compare&& comparator = Compare());
196
197 /**
198 * @brief Removes a task from this subflow.
199 * @note Not thread-safe
200 * @param task Task to remove
201 */
202 void RemoveTask(const Task& task) { subflow_.erase(task.UnderlyingTask()); }
203
204 /**
205 * @brief Creates a module task that encapsulates another task graph.
206 * @note Not thread-safe
207 * @param other_graph Task graph to compose into this subflow
208 * @return Task handle representing the composed graph
209 */
210 template <typename T>
211 Task ComposedOf(T& other_graph) {
212 return Task(subflow_.composed_of(other_graph.UnderlyingTaskflow()));
213 }
214
215 /**
216 * @brief Checks if this subflow can be joined.
217 * @details A subflow is joinable if it has not yet been joined with its parent task.
218 * @return True if the subflow is joinable, false otherwise
219 */
220 [[nodiscard]] bool Joinable() const noexcept { return subflow_.joinable(); }
221
222 /**
223 * @brief Checks if this subflow will be retained.
224 * @details A retained subflow remains valid after being joined.
225 * @return True if the subflow will be retained, false otherwise
226 */
227 [[nodiscard]] bool WillBeRetained() const noexcept { return subflow_.retain(); }
228
229 // Executor related methods
230
231 /**
232 * @brief Runs a task graph once.
233 * @details Task graph is not owned - ensure it remains alive during execution.
234 * @note Thread-safe.
235 * @param graph Task graph to execute
236 * @return Future that completes when execution finishes
237 */
238 auto Run(TaskGraph& graph) -> Future<void> {
239 return Future<void>(subflow_.executor().run(graph.UnderlyingTaskflow()));
240 }
241
242 /**
243 * @brief Runs a task graph once.
244 * @note Thread-safe.
245 * @param graph Task graph to execute (moved)
246 * @return Future that completes when execution finishes
247 */
248 auto Run(TaskGraph&& graph) -> Future<void> {
249 return Future<void>(subflow_.executor().run(std::move(std::move(graph).UnderlyingTaskflow())));
250 }
251
252 /**
253 * @brief Runs a task graph once and invokes a callback upon completion.
254 * @details Task graph is not owned - ensure it remains alive during execution.
255 * @note Thread-safe.
256 * @tparam C Callable type
257 * @param graph Task graph to execute
258 * @param callable Callback to invoke after execution completes
259 * @return Future that completes when execution finishes
260 */
261 template <std::invocable C>
262 auto Run(TaskGraph& graph, C&& callable) -> Future<void> {
263 return Future<void>(subflow_.executor().run(graph.UnderlyingTaskflow(), std::forward<C>(callable)));
264 }
265
266 /**
267 * @brief Runs a moved task graph once and invokes a callback upon completion.
268 * @note Thread-safe.
269 * @tparam C Callable type
270 * @param graph Task graph to execute (moved)
271 * @param callable Callback to invoke after execution completes
272 * @return Future that completes when execution finishes
273 */
274 template <std::invocable C>
275 auto Run(TaskGraph&& graph, C&& callable) -> Future<void> {
276 return Future<void>(
277 subflow_.executor().run(std::move(std::move(graph).UnderlyingTaskflow()), std::forward<C>(callable)));
278 }
279
280 /**
281 * @brief Runs a task graph for the specified number of times.
282 * @details Task graph is not owned - ensure it remains alive during execution.
283 * @note Thread-safe.
284 * @param graph Task graph to execute
285 * @param count Number of times to run the graph
286 * @return Future that completes when all executions finish
287 */
288 auto RunN(TaskGraph& graph, size_t count) -> Future<void> {
289 return Future<void>(subflow_.executor().run_n(graph.UnderlyingTaskflow(), count));
290 }
291
292 /**
293 * @brief Runs a moved task graph for the specified number of times.
294 * @note Thread-safe.
295 * @param graph Task graph to execute (moved)
296 * @param count Number of times to run the graph
297 * @return Future that completes when all executions finish
298 */
299 auto RunN(TaskGraph&& graph, size_t count) -> Future<void> {
300 return Future<void>(subflow_.executor().run_n(std::move(std::move(graph).UnderlyingTaskflow()), count));
301 }
302
303 /**
304 * @brief Runs a task graph for the specified number of times and invokes a callback.
305 * @details Task graph is not owned - ensure it remains alive during execution.
306 * @note Thread-safe.
307 * @tparam C Callable type
308 * @param graph Task graph to execute
309 * @param count Number of times to run the graph
310 * @param callable Callback to invoke after all executions complete
311 * @return Future that completes when all executions finish
312 */
313 template <std::invocable C>
314 auto RunN(TaskGraph& graph, size_t count, C&& callable) -> Future<void> {
315 return Future<void>(subflow_.executor().run_n(graph.UnderlyingTaskflow(), count, std::forward<C>(callable)));
316 }
317
318 /**
319 * @brief Runs a moved task graph for the specified number of times and invokes a callback.
320 * @note Thread-safe.
321 * @tparam C Callable type
322 * @param graph Task graph to execute (moved)
323 * @param count Number of times to run the graph
324 * @param callable Callback to invoke after all executions complete
325 * @return Future that completes when all executions finish
326 */
327 template <std::invocable C>
328 auto RunN(TaskGraph&& graph, size_t count, C&& callable) -> Future<void> {
329 return Future<void>(
330 subflow_.executor().run_n(std::move(std::move(graph).UnderlyingTaskflow()), count, std::forward<C>(callable)));
331 }
332
333 /**
334 * @brief Runs a task graph repeatedly until the predicate returns true.
335 * @details Task graph is not owned - ensure it remains alive during execution.
336 * @note Thread-safe.
337 * @tparam Predicate Predicate type
338 * @param graph Task graph to execute
339 * @param predicate Boolean predicate to determine when to stop
340 * @return Future that completes when predicate returns true
341 */
342 template <std::predicate Predicate>
343 auto RunUntil(TaskGraph& graph, Predicate&& predicate) -> Future<void> {
344 return Future<void>(subflow_.executor().run_until(graph.UnderlyingTaskflow(), std::forward<Predicate>(predicate)));
345 }
346
347 /**
348 * @brief Runs a moved task graph repeatedly until the predicate returns true.
349 * @note Thread-safe.
350 * @tparam Predicate Predicate type
351 * @param graph Task graph to execute (moved)
352 * @param predicate Boolean predicate to determine when to stop
353 * @return Future that completes when predicate returns true
354 */
355 template <std::predicate Predicate>
356 auto RunUntil(TaskGraph&& graph, Predicate&& predicate) -> Future<void> {
357 return Future<void>(subflow_.executor().run_until(std::move(std::move(graph).UnderlyingTaskflow()),
358 std::forward<Predicate>(predicate)));
359 }
360
361 /**
362 * @brief Runs a task graph repeatedly until the predicate returns true, then invokes a callback.
363 * @details Task graph is not owned - ensure it remains alive during execution.
364 * @note Thread-safe.
365 * @tparam Predicate Predicate type
366 * @tparam C Callable type
367 * @param graph Task graph to execute
368 * @param predicate Boolean predicate to determine when to stop
369 * @param callable Callback to invoke after execution completes
370 * @return Future that completes when predicate returns true and callback finishes
371 */
372 template <std::predicate Predicate, std::invocable C>
373 auto RunUntil(TaskGraph& graph, Predicate&& predicate, C&& callable) -> Future<void> {
374 return Future<void>(subflow_.executor().run_until(graph.UnderlyingTaskflow(), std::forward<Predicate>(predicate),
375 std::forward<C>(callable)));
376 }
377
378 /**
379 * @brief Runs a moved task graph repeatedly until the predicate returns true, then invokes a callback.
380 * @note Thread-safe.
381 * @tparam Predicate Predicate type
382 * @tparam C Callable type
383 * @param graph Task graph to execute (moved)
384 * @param predicate Boolean predicate to determine when to stop
385 * @param callable Callback to invoke after execution completes
386 * @return Future that completes when predicate returns true and callback finishes
387 */
388 template <std::predicate Predicate, std::invocable C>
389 auto RunUntil(TaskGraph&& graph, Predicate&& predicate, C&& callable) -> Future<void> {
390 return Future<void>(subflow_.executor().run_until(std::move(std::move(graph).UnderlyingTaskflow()),
391 std::forward<Predicate>(predicate), std::forward<C>(callable)));
392 }
393
394 /**
395 * @brief Creates an asynchronous task that runs the given callable.
396 * @details The task is scheduled immediately and runs independently.
397 * @note Thread-safe.
398 * @tparam C Callable type
399 * @param callable Function to execute asynchronously
400 * @return Future that will hold the result of the execution
401 */
402 template <std::invocable C>
403 auto Async(C&& callable) -> std::future<std::invoke_result_t<C>> {
404 return subflow_.executor().async(std::forward<C>(callable));
405 }
406
407 /**
408 * @brief Creates a named asynchronous task that runs the given callable.
409 * @details The task is scheduled immediately and runs independently.
410 * @note Thread-safe.
411 * @tparam C Callable type
412 * @param name Name for the task (useful for debugging/profiling)
413 * @param callable Function to execute asynchronously
414 * @return Future that will hold the result of the execution
415 */
416 template <std::invocable C>
417 auto Async(std::string name, C&& callable) -> std::future<std::invoke_result_t<C>> {
418 return subflow_.executor().async(std::move(name), std::forward<C>(callable));
419 }
420
421 /**
422 * @brief Creates an asynchronous task without returning a future.
423 * @details More efficient than Async when you don't need the result.
424 * @note Thread-safe.
425 * @tparam C Callable type
426 * @param callable Function to execute asynchronously
427 */
428 template <std::invocable C>
429 void SilentAsync(C&& callable) {
430 subflow_.executor().silent_async(std::forward<C>(callable));
431 }
432
433 /**
434 * @brief Creates a named asynchronous task without returning a future.
435 * @details More efficient than Async when you don't need the result.
436 * @note Thread-safe.
437 * @tparam C Callable type
438 * @param name Name for the task (useful for debugging/profiling)
439 * @param callable Function to execute asynchronously
440 */
441 template <std::invocable C>
442 void SilentAsync(std::string name, C&& callable) {
443 subflow_.executor().silent_async(std::move(name), std::forward<C>(callable));
444 }
445
446 /**
447 * @brief Creates an asynchronous task that runs after specified dependencies complete.
448 * @details The task will only execute after all dependencies finish.
449 * @note Thread-safe.
450 * @tparam C Callable type
451 * @tparam Dependencies Range type containing AsyncTask dependencies
452 * @param callable Function to execute asynchronously
453 * @param dependencies Tasks that must complete before this task runs
454 * @return Pair containing AsyncTask handle and Future for the result
455 */
456 template <std::invocable C, std::ranges::range Dependencies>
457 requires std::same_as<std::ranges::range_value_t<Dependencies>, AsyncTask>
458 auto DependentAsync(C&& callable, const Dependencies& dependencies)
459 -> std::pair<AsyncTask, std::future<std::invoke_result_t<C>>>;
460
461 /**
462 * @brief Creates an asynchronous task that runs after dependencies complete, without returning a future.
463 * @details More efficient than DependentAsync when you don't need the result.
464 * @note Thread-safe.
465 * @tparam C Callable type
466 * @tparam Dependencies Range type containing AsyncTask dependencies
467 * @param callable Function to execute asynchronously
468 * @param dependencies Tasks that must complete before this task runs
469 * @return AsyncTask handle
470 */
471 template <std::invocable C, std::ranges::range Dependencies>
472 requires std::same_as<std::ranges::range_value_t<Dependencies>, AsyncTask>
473 AsyncTask SilentDependentAsync(C&& callable, const Dependencies& dependencies);
474
475 /**
476 * @brief Blocks until all submitted tasks complete.
477 * @details Waits for all taskflows and async tasks to finish.
478 * @note Thread-safe.
479 */
480 void WaitForAll() { subflow_.executor().wait_for_all(); }
481
482 /**
483 * @brief Runs a task graph cooperatively and waits until it completes using the current worker thread.
484 * @warning Must be called from within a worker thread of this executor.
485 * Triggers assertion if called from a non-worker thread.
486 * @param graph Task graph to execute
487 */
488 void CoRun(TaskGraph& graph);
489
490 /**
491 * @brief Keeps the current worker thread running until the predicate returns true.
492 * @warning Must be called from within a worker thread of this executor.
493 * Triggers assertion if called from a non-worker thread.
494 * @tparam Predicate Predicate type
495 * @param predicate Boolean predicate to determine when to stop
496 */
497 template <std::predicate Predicate>
498 void CoRunUntil(Predicate&& predicate);
499
500 /**
501 * @brief Checks if the current thread is a worker thread of this executor.
502 * @note Thread safe.
503 * @return True if current thread is a worker, false otherwise
504 */
505 [[nodiscard]] bool IsWorkerThread() const { return CurrentWorkerId() != -1; }
506
507 /**
508 * @brief Gets the ID of the current worker thread.
509 * @note Thread-safe.
510 * @return Worker ID (0 to N-1) or -1 if not a worker thread
511 */
512 [[nodiscard]] int CurrentWorkerId() const { return subflow_.executor().this_worker_id(); }
513
514 /**
515 * @brief Gets the total number of worker threads.
516 * @note Thread safe.
517 * @return Count of workers.
518 */
519 [[nodiscard]] size_t WorkerCount() const noexcept { return subflow_.executor().num_workers(); }
520
521 /**
522 * @brief Gets the number of worker threads currently waiting for work.
523 * @note Thread safe.
524 * @return Count of idle workers.
525 */
526 [[nodiscard]] size_t IdleWorkerCount() const noexcept { return subflow_.executor().num_waiters(); }
527
528 /**
529 * @brief Gets the number of task queues in the work-stealing scheduler.
530 * @note Thread safe.
531 * @return Count of queues.
532 */
533 [[nodiscard]] size_t QueueCount() const noexcept { return subflow_.executor().num_queues(); }
534
535 /**
536 * @brief Gets the number of task graphs currently being executed.
537 * @note Thread safe.
538 * @return Count of running topologies.
539 */
540 [[nodiscard]] size_t RunningTopologyCount() const { return subflow_.executor().num_topologies(); }
541
542private:
543 explicit SubTaskGraph(tf::Subflow& subflow) : subflow_(subflow) {}
544
545 [[nodiscard]] tf::Subflow& UnderlyingSubflow() noexcept { return subflow_; }
546 [[nodiscard]] const tf::Subflow& UnderlyingSubflow() const noexcept { return subflow_; }
547
548 tf::Subflow& subflow_;
549
550 friend class TaskGraph;
551 friend class Executor;
552};
553
554template <SubTask C>
555inline Task SubTaskGraph::EmplaceTask(C&& callable) {
556 return Task(subflow_.emplace([callable = std::forward<C>(callable)](tf::Subflow& sf) mutable {
557 SubTaskGraph sub_graph(sf);
558 callable(sub_graph);
559 }));
560}
561
562template <std::ranges::range R>
563 requires std::same_as<std::ranges::range_value_t<R>, Task>
564inline void SubTaskGraph::Linearize(const R& tasks) {
565 std::vector<tf::Task> tf_tasks;
566 tf_tasks.reserve(std::ranges::size(tasks));
567
568 for (const auto& task : tasks) {
569 tf_tasks.push_back(task.UnderlyingTask());
570 }
571
572 subflow_.linearize(tf_tasks);
573}
574
575template <std::ranges::random_access_range R, typename Compare>
576 requires std::predicate<Compare, std::ranges::range_reference_t<R>, std::ranges::range_reference_t<R>>
577inline Task SubTaskGraph::Sort(R& range, Compare&& comparator) {
578 if constexpr (std::same_as<std::remove_cvref_t<Compare>, std::less<>>) {
579 return Task(subflow_.sort(std::ranges::begin(range), std::ranges::end(range)));
580 } else {
581 return Task(subflow_.sort(std::ranges::begin(range), std::ranges::end(range), std::forward<Compare>(comparator)));
582 }
583}
584
585template <std::invocable C, std::ranges::range Dependencies>
586 requires std::same_as<std::ranges::range_value_t<Dependencies>, AsyncTask>
587inline auto SubTaskGraph::DependentAsync(C&& callable, const Dependencies& dependencies)
588 -> std::pair<AsyncTask, std::future<std::invoke_result_t<C>>> {
589 std::vector<tf::AsyncTask> tf_deps;
590 if constexpr (std::ranges::sized_range<Dependencies>) {
591 tf_deps.reserve(std::ranges::size(dependencies));
592 }
593
594 for (const auto& dep : dependencies) {
595 tf_deps.push_back(dep.UnderlyingTask());
596 }
597
598 auto [task, future] = subflow_.executor().dependent_async(std::forward<C>(callable), tf_deps.begin(), tf_deps.end());
599 return std::make_pair(AsyncTask(std::move(task)), std::move(future));
600}
601
602template <std::invocable C, std::ranges::range Dependencies>
603 requires std::same_as<std::ranges::range_value_t<Dependencies>, AsyncTask>
604inline AsyncTask SubTaskGraph::SilentDependentAsync(C&& callable, const Dependencies& dependencies) {
605 std::vector<tf::AsyncTask> tf_deps;
606 if constexpr (std::ranges::sized_range<Dependencies>) {
607 tf_deps.reserve(std::ranges::size(dependencies));
608 }
609
610 for (const auto& dep : dependencies) {
611 tf_deps.push_back(dep.UnderlyingTask());
612 }
613
614 return AsyncTask(
615 subflow_.executor().silent_dependent_async(std::forward<C>(callable), tf_deps.begin(), tf_deps.end()));
616}
617
618template <SubTask C>
619inline Task TaskGraph::EmplaceTask(C&& callable) {
620 return Task(taskflow_.emplace([callable = std::forward<C>(callable)](tf::Subflow& subflow) {
621 SubTaskGraph sub_graph(subflow);
622 std::invoke(callable, sub_graph);
623 }));
624}
625
626inline void SubTaskGraph::CoRun(TaskGraph& graph) {
627 HELIOS_ASSERT(IsWorkerThread(), "Failed to co-run: Must be called from a worker thread");
628 subflow_.executor().corun(graph.UnderlyingTaskflow());
629}
630
631template <std::predicate Predicate>
632inline void SubTaskGraph::CoRunUntil(Predicate&& predicate) {
633 HELIOS_ASSERT(IsWorkerThread(), "Failed to co-run until: Must be called from a worker thread");
634 subflow_.executor().corun_until(std::forward<Predicate>(predicate));
635}
636
637} // namespace helios::async
#define HELIOS_ASSERT(condition,...)
Assertion macro that aborts execution in debug builds.
Definition assert.hpp:140
Handle to an asynchronous task managed by the Executor.
Manages worker threads and executes task graphs using work-stealing scheduling.
Definition executor.hpp:33
Wrapper around tf::Future for handling asynchronous task results.
Definition future.hpp:21
Dynamic task graph that can be created within the execution of a task.
auto Run(TaskGraph &&graph, C &&callable) -> Future< void >
Runs a moved task graph once and invokes a callback upon completion.
bool IsWorkerThread() const
Checks if the current thread is a worker thread of this executor.
Task EmplaceTask(C &&callable)
Creates a static task with the given callable.
Task ComposedOf(T &other_graph)
Creates a module task that encapsulates another task graph.
Task ForEach(const R &range, C &&callable)
Creates a parallel for-each task over the given range.
size_t WorkerCount() const noexcept
Gets the total number of worker threads.
size_t IdleWorkerCount() const noexcept
Gets the number of worker threads currently waiting for work.
size_t QueueCount() const noexcept
Gets the number of task queues in the work-stealing scheduler.
SubTaskGraph(SubTaskGraph &&)=default
void RemoveTask(const Task &task)
Removes a task from this subflow.
auto Async(C &&callable) -> std::future< std::invoke_result_t< C > >
Creates an asynchronous task that runs the given callable.
void Retain(bool flag) noexcept
Specifies whether to keep the sub task graph after it is joined.
auto RunUntil(TaskGraph &graph, Predicate &&predicate, C &&callable) -> Future< void >
Runs a task graph repeatedly until the predicate returns true, then invokes a callback.
bool Joinable() const noexcept
Checks if this subflow can be joined.
SubTaskGraph & operator=(SubTaskGraph &&)=delete
auto Run(TaskGraph &graph) -> Future< void >
Runs a task graph once.
int CurrentWorkerId() const
Gets the ID of the current worker thread.
auto RunN(TaskGraph &graph, size_t count, C &&callable) -> Future< void >
Runs a task graph for the specified number of times and invokes a callback.
void Join()
Joins the subflow with its parent task.
void CoRun(TaskGraph &graph)
Runs a task graph cooperatively and waits until it completes using the current worker thread.
auto RunN(TaskGraph &graph, size_t count) -> Future< void >
Runs a task graph for the specified number of times.
auto RunUntil(TaskGraph &&graph, Predicate &&predicate) -> Future< void >
Runs a moved task graph repeatedly until the predicate returns true.
auto EmplaceTasks(Cs &&... callables) -> std::array< Task, sizeof...(Cs)>
Creates multiple tasks from a list of callables.
SubTaskGraph & operator=(const SubTaskGraph &)=delete
SubTaskGraph(const SubTaskGraph &)=delete
auto Run(TaskGraph &&graph) -> Future< void >
Runs a task graph once.
auto Async(std::string name, C &&callable) -> std::future< std::invoke_result_t< C > >
Creates a named asynchronous task that runs the given callable.
Task Transform(const InputRange &input_range, OutputRange &output_range, TransformFunc &&transform_func)
Creates a parallel transform task that applies a function to each element.
void SilentAsync(C &&callable)
Creates an asynchronous task without returning a future.
void Linearize(const R &tasks)
Creates linear dependencies between tasks in the given range.
auto RunN(TaskGraph &&graph, size_t count) -> Future< void >
Runs a moved task graph for the specified number of times.
Task Sort(R &range, Compare &&comparator=Compare())
Creates a parallel sort task for the given range.
auto RunN(TaskGraph &&graph, size_t count, C &&callable) -> Future< void >
Runs a moved task graph for the specified number of times and invokes a callback.
void WaitForAll()
Blocks until all submitted tasks complete.
Task Reduce(const R &range, T &init, BinaryOp &&binary_op)
Creates a parallel reduction task that combines elements using a binary operation.
Task CreatePlaceholder()
Creates a placeholder task with no assigned work.
size_t RunningTopologyCount() const
Gets the number of task graphs currently being executed.
void SilentAsync(std::string name, C &&callable)
Creates a named asynchronous task without returning a future.
auto Run(TaskGraph &graph, C &&callable) -> Future< void >
Runs a task graph once and invokes a callback upon completion.
auto DependentAsync(C &&callable, const Dependencies &dependencies) -> std::pair< AsyncTask, std::future< std::invoke_result_t< C > > >
Creates an asynchronous task that runs after specified dependencies complete.
Task ForEachIndex(I start, I end, I step, C &&callable)
Creates a parallel for-each task over an index range.
AsyncTask SilentDependentAsync(C &&callable, const Dependencies &dependencies)
Creates an asynchronous task that runs after dependencies complete, without returning a future.
auto RunUntil(TaskGraph &&graph, Predicate &&predicate, C &&callable) -> Future< void >
Runs a moved task graph repeatedly until the predicate returns true, then invokes a callback.
auto RunUntil(TaskGraph &graph, Predicate &&predicate) -> Future< void >
Runs a task graph repeatedly until the predicate returns true.
void CoRunUntil(Predicate &&predicate)
Keeps the current worker thread running until the predicate returns true.
bool WillBeRetained() const noexcept
Checks if this subflow will be retained.
Represents a task dependency graph that can be executed by an Executor.
Task EmplaceTask(C &&callable)
Creates a static task with the given callable.
Represents a single task within a task graph.
Definition task.hpp:28
Concept for any valid task callable.
Definition common.hpp:86
STL namespace.