3#include "../observer/tfprof.hpp"
5#include "async_task.hpp"
64 friend class FlowBuilder;
67 friend class NonpreemptiveRuntime;
68 friend class Algorithm;
69 friend class TaskGroup;
92 size_t N = std::thread::hardware_concurrency(),
93 std::shared_ptr<WorkerInterface> wif =
nullptr
379 template<
typename P,
typename C>
410 template<
typename P,
typename C>
453 template <
typename T>
484 template <
typename P>
600 template <typename Observer, typename... ArgsT>
608 template <typename Observer>
645 template <typename P, typename F>
671 template <typename F>
697 template <typename P, typename F>
722 template <typename F>
756 template <typename F, typename... Tasks>
757requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
792 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
827 template <typename F, typename I>
828requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
866 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
910 template <typename F, typename... Tasks>
911requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
956 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
999 template <typename F, typename I>
1000requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
1048 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
1109 std::vector<Worker> _workers;
1110 std::vector<Buffer> _buffers;
1115 alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};
1117 std::unordered_map<std::thread::id, Worker*> _t2w;
1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
1121 void _observer_prologue(Worker&, Node*);
1122 void _observer_epilogue(Worker&, Node*);
1123 void _spawn(
size_t, std::shared_ptr<WorkerInterface>);
1124 void _exploit_task(Worker&, Node*&);
1125 bool _explore_task(Worker&, Node*&);
1126 void _schedule(Worker&, Node*);
1127 void _schedule(Node*);
1128 void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);
1130 void _set_up_topology(Worker*, Topology*);
1131 void _tear_down_topology(Worker&, Topology*, Node*&);
1132 void _tear_down_async(Worker&, Node*, Node*&);
1133 void _tear_down_dependent_async(Worker&, Node*, Node*&);
1134 void _tear_down_nonasync(Worker&, Node*, Node*&);
1135 void _tear_down_invoke(Worker&, Node*, Node*&);
1136 void _increment_topology();
1137 void _decrement_topology();
1138 void _invoke(Worker&, Node*);
1139 void _invoke_static_task(Worker&, Node*);
1140 void _invoke_nonpreemptive_runtime_task(Worker&, Node*);
1141 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
1142 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
1143 void _process_dependent_async(Node*,
tf::AsyncTask&,
size_t&);
1144 void _process_exception(Worker&, Node*);
1145 void _update_cache(Worker&, Node*&, Node*);
1146 void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);
1148 bool _wait_for_task(Worker&, Node*&);
1149 bool _invoke_subflow_task(Worker&, Node*);
1150 bool _invoke_module_task(Worker&, Node*);
1151 bool _invoke_adopted_module_task(Worker&, Node*);
1152 bool _invoke_module_task_impl(Worker&, Node*, Graph&);
1153 bool _invoke_async_task(Worker&, Node*);
1154 bool _invoke_dependent_async_task(Worker&, Node*);
1155 bool _invoke_runtime_task(Worker&, Node*);
1156 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<
void(Runtime&)>&);
1157 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<
void(Runtime&,
bool)>&);
1159 size_t _set_up_graph(Graph&, Topology*, NodeBase*);
1161 template <
typename P>
1162 void _corun_until(Worker&, P&&);
1164 template <
typename I>
1165 void _bulk_schedule(Worker&, I,
size_t);
1167 template <
typename I>
1168 void _bulk_schedule(I,
size_t);
1170 template <
typename I>
1171 void _bulk_spill(I,
size_t);
1173 template <
typename I>
1174 void _bulk_spill_round_robin(I,
size_t);
1177 void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&,
size_t&);
1179 template <
typename P,
typename F>
1180 auto _async(P&&, F&&, Topology*, NodeBase*);
1182 template <
typename P,
typename F>
1183 void _silent_async(P&&, F&&, Topology*, NodeBase*);
1185 template <TaskParamsLike P,
typename F,
typename I>
1186 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1187 auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1189 template <TaskParamsLike P,
typename F,
typename I>
1190 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1191 auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1193 template <
typename... ArgsT>
1194 void _schedule_async_task(ArgsT&&...);
1196 template <
typename I,
typename... ArgsT>
1197 AsyncTask _schedule_dependent_async_task(I, I,
size_t, ArgsT&&...);
1200#ifndef DOXYGEN_GENERATING_OUTPUT
1205 _buffers (std::bit_width(N)),
1209 TF_THROW(
"executor must define at least one worker");
1214#ifndef TF_DISABLE_EXCEPTION_HANDLING
1217 _spawn(N, std::move(wif));
1218#ifndef TF_DISABLE_EXCEPTION_HANDLING
1222 std::rethrow_exception(std::current_exception());
1227 if(
has_env(TF_ENABLE_PROFILER)) {
1238inline void Executor::_shutdown() {
1244 for(
size_t i=0; i<_workers.size(); ++i) {
1245 _workers[i]._done.test_and_set(std::memory_order_relaxed);
1248 _notifier.notify_all();
1252 for(
auto& w : _workers) {
1253 if(w._thread.joinable()) {
1261 return _workers.size();
1266 return _notifier.num_waiters();
1271 return _workers.size() + _buffers.size();
1276 return _num_topologies.load(std::memory_order_relaxed);
1281 auto itr = _t2w.find(std::this_thread::get_id());
1282 return itr == _t2w.end() ? nullptr : itr->second;
1287 auto i = _t2w.find(std::this_thread::get_id());
1288 return i == _t2w.end() ? -1 :
static_cast<int>(i->second->_id);
1292inline void Executor::_spawn(
size_t N, std::shared_ptr<WorkerInterface> wif) {
1294 for(
size_t id=0;
id<N; ++id) {
1295 _workers[id]._thread = std::thread([&,
id, wif] () {
1297 auto& worker = _workers[id];
1300 worker._sticky_victim = id;
1301 worker._rdgen.seed(
static_cast<uint32_t
>(std::hash<std::thread::id>()(std::this_thread::get_id())));
1305 wif->scheduler_prologue(worker);
1309 std::exception_ptr ptr =
nullptr;
1314#ifndef TF_DISABLE_EXCEPTION_HANDLING
1321 _exploit_task(worker, t);
1324 if(_wait_for_task(worker, t) ==
false) {
1329#ifndef TF_DISABLE_EXCEPTION_HANDLING
1332 ptr = std::current_exception();
1338 wif->scheduler_epilogue(worker, ptr);
1363 _t2w.emplace(_workers[
id]._thread.get_id(), &_workers[
id]);
1368inline bool Executor::_explore_task(
Worker& w, Node*& t) {
1376 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1381 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1385 size_t num_steals = 0;
1386 size_t vtm = w._sticky_victim;
1390 t = (vtm < _workers.size())
1391 ? _workers[vtm]._wsq.steal()
1392 : _buffers[vtm - _workers.size()].queue.steal();
1395 w._sticky_victim = vtm;
1401 vtm = w._rdgen() % (MAX_VICTIM - 1);
1402 if(vtm >= w._id) vtm++;
1404 if(++num_steals > MAX_STEALS) {
1405 std::this_thread::yield();
1406 if(num_steals > 150 + MAX_STEALS) {
1411 if(w._done.test(std::memory_order_relaxed)) {
1475inline void Executor::_exploit_task(
Worker& w, Node*& t) {
1483inline bool Executor::_wait_for_task(
Worker& w, Node*& t) {
1487 if(_explore_task(w, t) ==
false) {
1497 _notifier.prepare_wait(w._id);
1504 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1506 if(w._done.test(std::memory_order_relaxed)) {
1507 _notifier.cancel_wait(w._id);
1510 _notifier.commit_wait(w._id);
1515 for(
size_t b=0; b<_buffers.size(); ++b) {
1516 if(!_buffers[b].queue.empty()) {
1517 _notifier.cancel_wait(w._id);
1518 w._sticky_victim = b + _workers.size();
1528 for(
size_t k=0; k<_workers.size()-1; ++k) {
1529 if(
size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {
1530 _notifier.cancel_wait(w._id);
1531 w._sticky_victim = vtm;
1537 if(w._done.test(std::memory_order_relaxed)) {
1538 _notifier.cancel_wait(w._id);
1543 _notifier.commit_wait(w._id);
1548template<
typename Observer,
typename... ArgsT>
1552 std::is_base_of_v<ObserverInterface, Observer>,
1553 "Observer must be derived from ObserverInterface"
1557 auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);
1559 ptr->set_up(_workers.size());
1561 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1567template <
typename Observer>
1571 std::is_base_of_v<ObserverInterface, Observer>,
1572 "Observer must be derived from ObserverInterface"
1575 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1580 return _observers.size();
1584inline void Executor::_spill(Node* item) {
1587 auto b = (
reinterpret_cast<uintptr_t
>(item) >> 16) % _buffers.size();
1588 std::scoped_lock lock(_buffers[b].mutex);
1589 _buffers[b].queue.push(item);
1596template <
typename I>
1597void Executor::_bulk_spill(I first,
size_t N) {
1599 auto b = ((
reinterpret_cast<uintptr_t
>(*first) * 2654435761ULL) >> 32) % _buffers.size();
1600 std::scoped_lock lock(_buffers[b].mutex);
1601 _buffers[b].queue.bulk_push(first, N);
1609template <
typename I>
1610void Executor::_bulk_spill_round_robin(I first,
size_t N) {
1613 const size_t B = _buffers.size();
1614 const size_t start = ((
reinterpret_cast<uintptr_t
>(*first) * 2654435761ULL) >> 32) % B;
1615 const size_t per_buf = (N + B - 1) / B;
1616 size_t remaining = N;
1617 for(
size_t i = 0; i < B && remaining > 0; ++i) {
1618 size_t b = (start + i) % B;
1619 size_t chunk = std::min(per_buf, remaining);
1621 std::scoped_lock lock(_buffers[b].mutex);
1622 _buffers[b].queue.bulk_push(first, chunk);
1630inline void Executor::_schedule(
Worker& worker, Node* node) {
1633 if(worker._wsq.try_push(node) ==
false) {
1636 _notifier.notify_one();
1640inline void Executor::_schedule(Node* node) {
1642 _notifier.notify_one();
1646template <
typename I>
1647void Executor::_bulk_schedule(
Worker& worker, I first,
size_t num_nodes) {
1649 if(num_nodes == 0) {
1658 if(
auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
1659 _bulk_spill(first, num_nodes - n);
1661 _notifier.notify_n(num_nodes);
1671template <
typename I>
1672inline void Executor::_bulk_schedule(I first,
size_t num_nodes) {
1674 if(num_nodes == 0) {
1683 _bulk_spill(first, num_nodes);
1684 _notifier.notify_n(num_nodes);
1688TF_FORCE_INLINE
void Executor::_update_cache(
Worker& worker, Node*& cache, Node* node) {
1690 _schedule(worker, cache);
1697TF_FORCE_INLINE
void Executor::_bulk_update_cache(
1698 Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array,
size_t& n
1704 _bulk_schedule(worker, array, n);
1712inline void Executor::_invoke(
Worker& worker, Node* node) {
1714 #define TF_INVOKE_CONTINUATION() \
1717 goto begin_invoke; \
1722 Node* cache {
nullptr};
1725 if(node->_nstate & NSTATE::PREEMPTED) {
1732 if(node->_is_parent_cancelled()) {
1733 _tear_down_invoke(worker, node, cache);
1734 TF_INVOKE_CONTINUATION();
1739 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1740 SmallVector<Node*> waiters;
1741 if(!node->_acquire_all(waiters)) {
1742 _bulk_schedule(worker, waiters.begin(), waiters.size());
1749 SmallVector<int> conds;
1752 switch(node->_handle.index()) {
1755 _invoke_static_task(worker, node);
1760 case Node::RUNTIME:{
1761 if(_invoke_runtime_task(worker, node)) {
1768 case Node::NONPREEMPTIVE_RUNTIME:{
1769 _invoke_nonpreemptive_runtime_task(worker, node);
1774 case Node::SUBFLOW: {
1775 if(_invoke_subflow_task(worker, node)) {
1782 case Node::CONDITION: {
1783 _invoke_condition_task(worker, node, conds);
1788 case Node::MULTI_CONDITION: {
1789 _invoke_multi_condition_task(worker, node, conds);
1794 case Node::MODULE: {
1795 if(_invoke_module_task(worker, node)) {
1802 case Node::ADOPTED_MODULE: {
1803 if(_invoke_adopted_module_task(worker, node)) {
1811 if(_invoke_async_task(worker, node)) {
1814 _tear_down_async(worker, node, cache);
1815 TF_INVOKE_CONTINUATION();
1821 case Node::DEPENDENT_ASYNC: {
1822 if(_invoke_dependent_async_task(worker, node)) {
1825 _tear_down_dependent_async(worker, node, cache);
1826 TF_INVOKE_CONTINUATION();
1837 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1838 SmallVector<Node*> waiters;
1839 node->_release_all(waiters);
1840 _bulk_schedule(worker, waiters.begin(), waiters.size());
1849 node->_join_counter.fetch_add(
1850 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed
1854 switch(node->_handle.index()) {
1857 case Node::CONDITION:
1858 case Node::MULTI_CONDITION: {
1859 for(
auto cond : conds) {
1860 if(cond >= 0 &&
static_cast<size_t>(cond) < node->_num_successors) {
1861 auto s = node->_edges[cond];
1863 s->_join_counter.store(0, std::memory_order_relaxed);
1864 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1865 _update_cache(worker, cache, s);
1873 for(
size_t i=0; i<node->_num_successors; ++i) {
1874 if(
auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1875 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1876 _update_cache(worker, cache, s);
1884 _tear_down_nonasync(worker, node, cache);
1885 TF_INVOKE_CONTINUATION();
1889inline void Executor::_tear_down_nonasync(
Worker& worker, Node* node, Node*& cache) {
1893 if(
auto parent = node->_parent; parent == node->_topology) {
1894 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1895 _tear_down_topology(worker, node->_topology, cache);
1901 auto state = parent->_nstate;
1902 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1904 if(state & NSTATE::PREEMPTED) {
1905 _update_cache(worker, cache,
static_cast<Node*
>(parent));
1912inline void Executor::_tear_down_invoke(
Worker& worker, Node* node, Node*& cache) {
1913 switch(node->_handle.index()) {
1915 _tear_down_async(worker, node, cache);
1918 case Node::DEPENDENT_ASYNC:
1919 _tear_down_dependent_async(worker, node, cache);
1923 _tear_down_nonasync(worker, node, cache);
1929inline void Executor::_observer_prologue(
Worker& worker, Node* node) {
1930 for(
auto& observer : _observers) {
1931 observer->on_entry(WorkerView(worker), TaskView(*node));
1936inline void Executor::_observer_epilogue(
Worker& worker, Node* node) {
1937 for(
auto& observer : _observers) {
1938 observer->on_exit(WorkerView(worker), TaskView(*node));
1943inline void Executor::_process_exception(
Worker&, Node* node) {
1948 NodeBase* ea = node;
1949 NodeBase* ia =
nullptr;
1951 while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {
1952 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);
1954 if(ia ==
nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {
1961 constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;
1966 if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1967 ea->_exception_ptr = std::current_exception();
1973 if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1974 ia->_exception_ptr = std::current_exception();
1982 node->_exception_ptr = std::current_exception();
1986inline void Executor::_invoke_static_task(
Worker& worker, Node* node) {
1987 _observer_prologue(worker, node);
1988 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1989 std::get_if<Node::Static>(&node->_handle)->work();
1991 _observer_epilogue(worker, node);
1995inline bool Executor::_invoke_subflow_task(
Worker& worker, Node* node) {
1997 auto& h = *std::get_if<Node::Subflow>(&node->_handle);
1998 auto& g = h.subgraph;
2000 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2003 Subflow sf(*
this, worker, node, g);
2006 _observer_prologue(worker, node);
2007 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2010 _observer_epilogue(worker, node);
2014 if(sf.joinable() && !g.empty()) {
2017 node->_nstate |= NSTATE::PREEMPTED;
2020 _schedule_graph(worker, g, node->_topology, node);
2025 node->_nstate &= ~NSTATE::PREEMPTED;
2032 if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {
2040inline void Executor::_invoke_condition_task(
2043 _observer_prologue(worker, node);
2044 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2045 auto& work = std::get_if<Node::Condition>(&node->_handle)->work;
2048 _observer_epilogue(worker, node);
2052inline void Executor::_invoke_multi_condition_task(
2055 _observer_prologue(worker, node);
2056 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2057 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
2059 _observer_epilogue(worker, node);
2063inline bool Executor::_invoke_module_task(
Worker& w, Node* node) {
2064 return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);
2068inline bool Executor::_invoke_adopted_module_task(
Worker& w, Node* node) {
2069 return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);
2073inline bool Executor::_invoke_module_task_impl(
Worker& w, Node* node,
Graph& graph) {
2081 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2083 node->_nstate |= NSTATE::PREEMPTED;
2084 _schedule_graph(w, graph, node->_topology, node);
2089 node->_nstate &= ~NSTATE::PREEMPTED;
2096inline bool Executor::_invoke_async_task(
Worker& worker, Node* node) {
2097 auto& work = std::get_if<Node::Async>(&node->_handle)->work;
2098 switch(work.index()) {
2101 _observer_prologue(worker, node);
2102 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2103 std::get_if<0>(&work)->operator()();
2105 _observer_epilogue(worker, node);
2110 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2117 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2127inline bool Executor::_invoke_dependent_async_task(
Worker& worker, Node* node) {
2128 auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
2129 switch(work.index()) {
2132 _observer_prologue(worker, node);
2133 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2134 std::get_if<0>(&work)->operator()();
2136 _observer_epilogue(worker, node);
2141 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2148 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2158 return run_n(f, 1, [](){});
2163 return run_n(std::move(f), 1, [](){});
2167template <
typename C>
2169 return run_n(f, 1, std::forward<C>(c));
2173template <
typename C>
2175 return run_n(std::move(f), 1, std::forward<C>(c));
2180 return run_n(f, repeat, [](){});
2185 return run_n(std::move(f), repeat, [](){});
2189template <
typename C>
2192 f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c)
2197template <
typename C>
2200 std::move(f), [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c)
2207 return run_until(f, std::forward<P>(pred), [](){});
2213 return run_until(std::move(f), std::forward<P>(pred), [](){});
2217template <
typename P,
typename C>
2221 if(f.empty() || p()) {
2223 std::promise<void> promise;
2224 promise.set_value();
2225 return tf::Future<void>(promise.get_future());
2228 _increment_topology();
2231 auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));
2235 tf::Future<void> future(t->_promise.get_future(), t);
2238 if(f._fetch_enqueue(t) == 0) {
2248template <
typename P>
2253 TF_THROW(
"corun_until must be called by a worker of the executor");
2256 _corun_until(*w, std::forward<P>(predicate));
2313template <
typename P>
2314void Executor::_corun_until(
Worker& w, P&& stop_predicate) {
2317 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2321 while(!stop_predicate()) {
2325 if(
auto t = w._wsq.pop(); t) {
2329 size_t num_steals = 0;
2330 size_t vtm = w._sticky_victim;
2334 t = (vtm < _workers.size())
2335 ? _workers[vtm]._wsq.steal()
2336 : _buffers[vtm-_workers.size()].queue.steal();
2340 w._sticky_victim = vtm;
2343 else if(!stop_predicate()) {
2344 if(++num_steals > MAX_STEALS) {
2345 std::this_thread::yield();
2347 vtm = w._rdgen() % MAX_VICTIM;
2358template <
typename T>
2363 TF_THROW(
"corun must be called by a worker of the executor");
2371inline void Executor::_corun_graph(
Worker& w,
Graph& g, Topology* tpg, NodeBase* p) {
2380 ExplicitAnchorGuard anchor(p);
2381 _schedule_graph(w, g, tpg, p);
2382 _corun_until(w, [p] () ->
bool {
2383 return p->_join_counter.load(std::memory_order_acquire) == 0; }
2388 p->_rethrow_exception();
2392inline void Executor::_increment_topology() {
2393 _num_topologies.fetch_add(1, std::memory_order_relaxed);
2397inline void Executor::_decrement_topology() {
2398 if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2399 _num_topologies.notify_all();
2405 size_t n = _num_topologies.load(std::memory_order_acquire);
2407 _num_topologies.wait(n, std::memory_order_acquire);
2408 n = _num_topologies.load(std::memory_order_acquire);
2413inline void Executor::_schedule_graph(
2414 Worker& worker,
Graph& graph, Topology* tpg, NodeBase* parent
2416 size_t num_srcs = _set_up_graph(graph, tpg, parent);
2417 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);
2418 _bulk_schedule(worker, graph.begin(), num_srcs);
2422inline void Executor::_set_up_topology(
Worker* w, Topology* tpg) {
2424 auto& g = tpg->_taskflow._graph;
2425 size_t num_srcs = _set_up_graph(g, tpg, tpg);
2426 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);
2427 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);
2431inline size_t Executor::_set_up_graph(
Graph& graph, Topology* tpg, NodeBase* parent) {
2433 auto first = graph.begin();
2434 auto last = graph.end();
2436 for(; first != last; ++first) {
2439 node->_topology = tpg;
2440 node->_parent = parent;
2441 node->_nstate = NSTATE::NONE;
2442 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
2443 node->_set_up_join_counter();
2444 node->_exception_ptr =
nullptr;
2448 if(node->num_predecessors() == 0) {
2449 std::iter_swap(send++, first);
2452 return send - graph.begin();
2456inline void Executor::_tear_down_topology(
Worker& worker, Topology* tpg, Node*& cache) {
2458 auto &f = tpg->_taskflow;
2464 if(!tpg->cancelled() && !tpg->_predicate()) {
2467 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2476 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
2478 auto fetched_tpg {std::move(f._topologies.front())};
2481 f._topologies.pop();
2482 tpg = f._topologies.front().get();
2488 fetched_tpg->_carry_out_promise();
2491 _decrement_topology();
2493 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2498 auto fetched_tpg {std::move(f._topologies.front())};
2501 f._topologies.pop();
2507 fetched_tpg->_carry_out_promise();
2509 _decrement_topology();
2512 if(
auto parent = fetched_tpg->_parent; parent) {
2514 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2517 _update_cache(worker, cache,
static_cast<Node*
>(parent));
2532 TF_THROW(
"subflow already joined");
2535 _executor._corun_graph(_worker, _graph, _node->_topology, _node);
2538 _node->_nstate |= NSTATE::JOINED_SUBFLOW;
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
void silent_async(P &¶ms, F &&func)
similar to tf::Executor::async but does not return a future object
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run(Taskflow &&taskflow)
runs a moved taskflow once
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
size_t num_waiters() const noexcept
queries the number of workers that are in the waiting loop
tf::Future< void > run(Taskflow &&taskflow, C &&callable)
runs a moved taskflow once and invoke a callback upon completion
~Executor()
destructs the executor
int this_worker_id() const
queries the id of the caller thread within this executor
size_t num_queues() const noexcept
queries the number of work-stealing queues used by the executor
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
size_t num_topologies() const
queries the number of running topologies at the time of this call
Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)
constructs the executor with N worker threads
TaskGroup task_group()
creates a task group that executes a collection of asynchronous tasks
Definition task_group.hpp:875
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
size_t num_workers() const noexcept
queries the number of worker threads
tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)
runs a moved taskflow and keeps running it until the predicate becomes true
void wait_for_all()
waits for all tasks to complete
tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)
runs a taskflow for N times and then invokes a callback
tf::Future< void > run(Taskflow &taskflow, C &&callable)
runs a taskflow once and invoke a callback upon completion
tf::Future< void > run_n(Taskflow &&taskflow, size_t N)
runs a moved taskflow for N times
tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)
runs a moved taskflow for N times and then invokes a callback
tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)
runs a taskflow multiple times until the predicate becomes true and then invokes the callback
Worker * this_worker()
queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr
auto async(P &¶ms, F &&func)
creates a parameterized asynchronous task to run the given function
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
size_t num_observers() const noexcept
queries the number of observers
class to access the result of an execution
Definition taskflow.hpp:630
class to create a graph object
Definition graph.hpp:47
class to define a vector optimized for small array
Definition small_vector.hpp:931
void join()
enables the subflow to join its parent task
bool joinable() const noexcept
queries if the subflow is joinable
Definition flow_builder.hpp:1825
class to create a taskflow object
Definition taskflow.hpp:64
class to create a lock-free unbounded work-stealing queue
Definition wsq.hpp:105
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:202
taskflow namespace
Definition small_vector.hpp:20
NonblockingNotifier DefaultNotifier
the default notifier type used by Taskflow
Definition worker.hpp:38
Graph & retrieve_graph(T &target)
retrieves a reference to the underlying tf::Graph from an object
Definition graph.hpp:1067
bool has_env(const std::string &str)
checks whether an environment variable is defined
Definition os.hpp:284