100.00% Lines (139/139) 100.00% Functions (29/29)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/boostorg/capy 8   // Official repository: https://github.com/boostorg/capy
9   // 9   //
10   10  
11   #include <boost/capy/ex/thread_pool.hpp> 11   #include <boost/capy/ex/thread_pool.hpp>
12   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
13   #include <boost/capy/detail/thread_local_ptr.hpp> 13   #include <boost/capy/detail/thread_local_ptr.hpp>
14   #include <boost/capy/ex/frame_allocator.hpp> 14   #include <boost/capy/ex/frame_allocator.hpp>
15   #include <boost/capy/test/thread_name.hpp> 15   #include <boost/capy/test/thread_name.hpp>
16   #include <algorithm> 16   #include <algorithm>
17   #include <atomic> 17   #include <atomic>
18   #include <condition_variable> 18   #include <condition_variable>
19   #include <cstdio> 19   #include <cstdio>
20   #include <mutex> 20   #include <mutex>
21   #include <thread> 21   #include <thread>
22   #include <vector> 22   #include <vector>
23   23  
24   /* 24   /*
25   Thread pool implementation using a shared work queue. 25   Thread pool implementation using a shared work queue.
26   26  
27   Work items are continuations linked via their intrusive next pointer, 27   Work items are continuations linked via their intrusive next pointer,
28   stored in a single queue protected by a mutex. No per-post heap 28   stored in a single queue protected by a mutex. No per-post heap
29   allocation: the continuation is owned by the caller and linked 29   allocation: the continuation is owned by the caller and linked
30   directly. Worker threads wait on a condition_variable until work 30   directly. Worker threads wait on a condition_variable until work
31   is available or stop is requested. 31   is available or stop is requested.
32   32  
33   Threads are started lazily on first post() via std::call_once to avoid 33   Threads are started lazily on first post() via std::call_once to avoid
34   spawning threads for pools that are constructed but never used. Each 34   spawning threads for pools that are constructed but never used. Each
35   thread is named with a configurable prefix plus index for debugger 35   thread is named with a configurable prefix plus index for debugger
36   visibility. 36   visibility.
37   37  
38   Work tracking: on_work_started/on_work_finished maintain an atomic 38   Work tracking: on_work_started/on_work_finished maintain an atomic
39   outstanding_work_ counter. join() blocks until this counter reaches 39   outstanding_work_ counter. join() blocks until this counter reaches
40   zero, then signals workers to stop and joins threads. 40   zero, then signals workers to stop and joins threads.
41   41  
42   Two shutdown paths: 42   Two shutdown paths:
43   - join(): waits for outstanding work to drain, then stops workers. 43   - join(): waits for outstanding work to drain, then stops workers.
44   - stop(): immediately signals workers to exit; queued work is abandoned. 44   - stop(): immediately signals workers to exit; queued work is abandoned.
45   - Destructor: stop() then join() (abandon + wait for threads). 45   - Destructor: stop() then join() (abandon + wait for threads).
46   */ 46   */
47   47  
48   namespace boost { 48   namespace boost {
49   namespace capy { 49   namespace capy {
50   50  
51   //------------------------------------------------------------------------------ 51   //------------------------------------------------------------------------------
52   52  
53   class thread_pool::impl 53   class thread_pool::impl
54   { 54   {
55   // Identifies the pool owning the current worker thread, or 55   // Identifies the pool owning the current worker thread, or
56   // nullptr if the calling thread is not a pool worker. Checked 56   // nullptr if the calling thread is not a pool worker. Checked
57   // by dispatch() to decide between symmetric transfer (inline 57   // by dispatch() to decide between symmetric transfer (inline
58   // resume) and post. 58   // resume) and post.
59   static inline detail::thread_local_ptr<impl const> current_; 59   static inline detail::thread_local_ptr<impl const> current_;
60   60  
61   // Intrusive queue of continuations via continuation::next. 61   // Intrusive queue of continuations via continuation::next.
62   // No per-post allocation: the continuation is owned by the caller. 62   // No per-post allocation: the continuation is owned by the caller.
63   continuation* head_ = nullptr; 63   continuation* head_ = nullptr;
64   continuation* tail_ = nullptr; 64   continuation* tail_ = nullptr;
65   65  
HITCBC 66   18998 void push(continuation* c) noexcept 66   19345 void push(continuation* c) noexcept
67   { 67   {
HITCBC 68   18998 c->next = nullptr; 68   19345 c->next = nullptr;
HITCBC 69   18998 if(tail_) 69   19345 if(tail_)
HITCBC 70   3750 tail_->next = c; 70   3421 tail_->next = c;
71   else 71   else
HITCBC 72   15248 head_ = c; 72   15924 head_ = c;
HITCBC 73   18998 tail_ = c; 73   19345 tail_ = c;
HITCBC 74   18998 } 74   19345 }
75   75  
HITCBC 76   19164 continuation* pop() noexcept 76   19511 continuation* pop() noexcept
77   { 77   {
HITCBC 78   19164 if(!head_) 78   19511 if(!head_)
HITCBC 79   166 return nullptr; 79   166 return nullptr;
HITCBC 80   18998 continuation* c = head_; 80   19345 continuation* c = head_;
HITCBC 81   18998 head_ = head_->next; 81   19345 head_ = head_->next;
HITCBC 82   18998 if(!head_) 82   19345 if(!head_)
HITCBC 83   15248 tail_ = nullptr; 83   15924 tail_ = nullptr;
HITCBC 84   18998 return c; 84   19345 return c;
85   } 85   }
86   86  
HITCBC 87   36687 bool empty() const noexcept 87   36956 bool empty() const noexcept
88   { 88   {
HITCBC 89   36687 return head_ == nullptr; 89   36956 return head_ == nullptr;
90   } 90   }
91   91  
92   std::mutex mutex_; 92   std::mutex mutex_;
93   std::condition_variable work_cv_; 93   std::condition_variable work_cv_;
94   std::condition_variable done_cv_; 94   std::condition_variable done_cv_;
95   std::vector<std::thread> threads_; 95   std::vector<std::thread> threads_;
96   std::atomic<std::size_t> outstanding_work_{0}; 96   std::atomic<std::size_t> outstanding_work_{0};
97   bool stop_{false}; 97   bool stop_{false};
98   bool joined_{false}; 98   bool joined_{false};
99   std::size_t num_threads_; 99   std::size_t num_threads_;
100   char thread_name_prefix_[13]{}; // 12 chars max + null terminator 100   char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101   std::once_flag start_flag_; 101   std::once_flag start_flag_;
102   102  
103   public: 103   public:
HITCBC 104   166 ~impl() = default; 104   166 ~impl() = default;
105   105  
106   bool 106   bool
HITCBC 107   355 running_in_this_thread() const noexcept 107   355 running_in_this_thread() const noexcept
108   { 108   {
HITCBC 109   355 return current_.get() == this; 109   355 return current_.get() == this;
110   } 110   }
111   111  
112   // Destroy abandoned coroutine frames. Must be called 112   // Destroy abandoned coroutine frames. Must be called
113   // before execution_context::shutdown()/destroy() so 113   // before execution_context::shutdown()/destroy() so
114   // that suspended-frame destructors (e.g. delay_awaitable 114   // that suspended-frame destructors (e.g. delay_awaitable
115   // calling timer_service::cancel()) run while services 115   // calling timer_service::cancel()) run while services
116   // are still valid. 116   // are still valid.
117   void 117   void
HITCBC 118   166 drain_abandoned() noexcept 118   166 drain_abandoned() noexcept
119   { 119   {
HITCBC 120   366 while(auto* c = pop()) 120   363 while(auto* c = pop())
121   { 121   {
HITCBC 122   200 auto h = c->h; 122   197 auto h = c->h;
HITCBC 123   200 if(h && h != std::noop_coroutine()) 123   197 if(h && h != std::noop_coroutine())
HITCBC 124   149 h.destroy(); 124   146 h.destroy();
HITCBC 125   200 } 125   197 }
HITCBC 126   166 } 126   166 }
127   127  
HITCBC 128   166 impl(std::size_t num_threads, std::string_view thread_name_prefix) 128   166 impl(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 129   166 : num_threads_(num_threads) 129   166 : num_threads_(num_threads)
130   { 130   {
HITCBC 131   166 if(num_threads_ == 0) 131   166 if(num_threads_ == 0)
HITCBC 132   4 num_threads_ = std::max( 132   4 num_threads_ = std::max(
HITCBC 133   2 std::thread::hardware_concurrency(), 1u); 133   2 std::thread::hardware_concurrency(), 1u);
134   134  
135   // Truncate prefix to 12 chars, leaving room for up to 3-digit index. 135   // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
HITCBC 136   166 auto n = thread_name_prefix.copy(thread_name_prefix_, 12); 136   166 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
HITCBC 137   166 thread_name_prefix_[n] = '\0'; 137   166 thread_name_prefix_[n] = '\0';
HITCBC 138   166 } 138   166 }
139   139  
140   void 140   void
HITCBC 141   18998 post(continuation& c) 141   19345 post(continuation& c)
142   { 142   {
HITCBC 143   18998 ensure_started(); 143   19345 ensure_started();
144   { 144   {
HITCBC 145   18998 std::lock_guard<std::mutex> lock(mutex_); 145   19345 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 146   18998 push(&c); 146   19345 push(&c);
HITCBC 147   18998 } 147   19345 }
HITCBC 148   18998 work_cv_.notify_one(); 148   19345 work_cv_.notify_one();
HITCBC 149   18998 } 149   19345 }
150   150  
151   void 151   void
HITCBC 152   347 on_work_started() noexcept 152   347 on_work_started() noexcept
153   { 153   {
HITCBC 154   347 outstanding_work_.fetch_add(1, std::memory_order_acq_rel); 154   347 outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
HITCBC 155   347 } 155   347 }
156   156  
157   void 157   void
HITCBC 158   347 on_work_finished() noexcept 158   347 on_work_finished() noexcept
159   { 159   {
HITCBC 160   347 if(outstanding_work_.fetch_sub( 160   347 if(outstanding_work_.fetch_sub(
HITCBC 161   347 1, std::memory_order_acq_rel) == 1) 161   347 1, std::memory_order_acq_rel) == 1)
162   { 162   {
HITCBC 163   87 std::lock_guard<std::mutex> lock(mutex_); 163   88 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 164   87 if(joined_ && !stop_) 164   88 if(joined_ && !stop_)
HITCBC 165   4 stop_ = true; 165   4 stop_ = true;
HITCBC 166   87 done_cv_.notify_all(); 166   88 done_cv_.notify_all();
HITCBC 167   87 work_cv_.notify_all(); 167   88 work_cv_.notify_all();
HITCBC 168   87 } 168   88 }
HITCBC 169   347 } 169   347 }
170   170  
171   void 171   void
HITCBC 172   178 join() noexcept 172   178 join() noexcept
173   { 173   {
174   { 174   {
HITCBC 175   178 std::unique_lock<std::mutex> lock(mutex_); 175   178 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 176   178 if(joined_) 176   178 if(joined_)
HITCBC 177   12 return; 177   12 return;
HITCBC 178   166 joined_ = true; 178   166 joined_ = true;
179   179  
HITCBC 180   166 if(outstanding_work_.load( 180   166 if(outstanding_work_.load(
HITCBC 181   166 std::memory_order_acquire) == 0) 181   166 std::memory_order_acquire) == 0)
182   { 182   {
HITCBC 183   108 stop_ = true; 183   108 stop_ = true;
HITCBC 184   108 work_cv_.notify_all(); 184   108 work_cv_.notify_all();
185   } 185   }
186   else 186   else
187   { 187   {
HITCBC 188   58 done_cv_.wait(lock, [this]{ 188   58 done_cv_.wait(lock, [this]{
HITCBC 189   63 return stop_; 189   63 return stop_;
190   }); 190   });
191   } 191   }
HITCBC 192   178 } 192   178 }
193   193  
HITCBC 194   359 for(auto& t : threads_) 194   359 for(auto& t : threads_)
HITCBC 195   193 if(t.joinable()) 195   193 if(t.joinable())
HITCBC 196   193 t.join(); 196   193 t.join();
197   } 197   }
198   198  
199   void 199   void
HITCBC 200   168 stop() noexcept 200   168 stop() noexcept
201   { 201   {
202   { 202   {
HITCBC 203   168 std::lock_guard<std::mutex> lock(mutex_); 203   168 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 204   168 stop_ = true; 204   168 stop_ = true;
HITCBC 205   168 } 205   168 }
HITCBC 206   168 work_cv_.notify_all(); 206   168 work_cv_.notify_all();
HITCBC 207   168 done_cv_.notify_all(); 207   168 done_cv_.notify_all();
HITCBC 208   168 } 208   168 }
209   209  
210   private: 210   private:
211   void 211   void
HITCBC 212   18998 ensure_started() 212   19345 ensure_started()
213   { 213   {
HITCBC 214   18998 std::call_once(start_flag_, [this]{ 214   19345 std::call_once(start_flag_, [this]{
HITCBC 215   109 threads_.reserve(num_threads_); 215   109 threads_.reserve(num_threads_);
HITCBC 216   302 for(std::size_t i = 0; i < num_threads_; ++i) 216   302 for(std::size_t i = 0; i < num_threads_; ++i)
HITCBC 217   386 threads_.emplace_back([this, i]{ run(i); }); 217   386 threads_.emplace_back([this, i]{ run(i); });
HITCBC 218   109 }); 218   109 });
HITCBC 219   18998 } 219   19345 }
220   220  
221   void 221   void
HITCBC 222   193 run(std::size_t index) 222   193 run(std::size_t index)
223   { 223   {
224   // Build name; set_current_thread_name truncates to platform limits. 224   // Build name; set_current_thread_name truncates to platform limits.
225   char name[16]; 225   char name[16];
HITCBC 226   193 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); 226   193 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
HITCBC 227   193 set_current_thread_name(name); 227   193 set_current_thread_name(name);
228   228  
229   // Mark this thread as a worker of this pool so dispatch() 229   // Mark this thread as a worker of this pool so dispatch()
230   // can symmetric-transfer when called from within pool work. 230   // can symmetric-transfer when called from within pool work.
231   struct scoped_pool 231   struct scoped_pool
232   { 232   {
HITCBC 233   193 scoped_pool(impl const* p) noexcept { current_.set(p); } 233   193 scoped_pool(impl const* p) noexcept { current_.set(p); }
HITCBC 234   193 ~scoped_pool() noexcept { current_.set(nullptr); } 234   193 ~scoped_pool() noexcept { current_.set(nullptr); }
HITCBC 235   193 } guard(this); 235   193 } guard(this);
236   236  
237   for(;;) 237   for(;;)
238   { 238   {
HITCBC 239   18991 continuation* c = nullptr; 239   19341 continuation* c = nullptr;
240   { 240   {
HITCBC 241   18991 std::unique_lock<std::mutex> lock(mutex_); 241   19341 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 242   18991 work_cv_.wait(lock, [this]{ 242   19341 work_cv_.wait(lock, [this]{
HITCBC 243   54473 return !empty() || 243   54663 return !empty() ||
HITCBC 244   54473 stop_; 244   54663 stop_;
245   }); 245   });
HITCBC 246   18991 if(stop_) 246   19341 if(stop_)
HITCBC 247   386 return; 247   386 return;
HITCBC 248   18798 c = pop(); 248   19148 c = pop();
HITCBC 249   18991 } 249   19341 }
HITCBC 250   18798 if(c) 250   19148 if(c)
HITCBC 251   18798 safe_resume(c->h); 251   19148 safe_resume(c->h);
HITCBC 252   18798 } 252   19148 }
HITCBC 253   193 } 253   193 }
254   }; 254   };
255   255  
256   //------------------------------------------------------------------------------ 256   //------------------------------------------------------------------------------
257   257  
HITCBC 258   166 thread_pool:: 258   166 thread_pool::
259   ~thread_pool() 259   ~thread_pool()
260   { 260   {
HITCBC 261   166 impl_->stop(); 261   166 impl_->stop();
HITCBC 262   166 impl_->join(); 262   166 impl_->join();
HITCBC 263   166 impl_->drain_abandoned(); 263   166 impl_->drain_abandoned();
HITCBC 264   166 shutdown(); 264   166 shutdown();
HITCBC 265   166 destroy(); 265   166 destroy();
HITCBC 266   166 delete impl_; 266   166 delete impl_;
HITCBC 267   166 } 267   166 }
268   268  
HITCBC 269   166 thread_pool:: 269   166 thread_pool::
HITCBC 270   166 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix) 270   166 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 271   166 : impl_(new impl(num_threads, thread_name_prefix)) 271   166 : impl_(new impl(num_threads, thread_name_prefix))
272   { 272   {
HITCBC 273   166 this->set_frame_allocator(std::allocator<void>{}); 273   166 this->set_frame_allocator(std::allocator<void>{});
HITCBC 274   166 } 274   166 }
275   275  
276   void 276   void
HITCBC 277   12 thread_pool:: 277   12 thread_pool::
278   join() noexcept 278   join() noexcept
279   { 279   {
HITCBC 280   12 impl_->join(); 280   12 impl_->join();
HITCBC 281   12 } 281   12 }
282   282  
283   void 283   void
HITCBC 284   2 thread_pool:: 284   2 thread_pool::
285   stop() noexcept 285   stop() noexcept
286   { 286   {
HITCBC 287   2 impl_->stop(); 287   2 impl_->stop();
HITCBC 288   2 } 288   2 }
289   289  
290   //------------------------------------------------------------------------------ 290   //------------------------------------------------------------------------------
291   291  
292   thread_pool::executor_type 292   thread_pool::executor_type
HITCBC 293   11580 thread_pool:: 293   11580 thread_pool::
294   get_executor() const noexcept 294   get_executor() const noexcept
295   { 295   {
HITCBC 296   11580 return executor_type( 296   11580 return executor_type(
HITCBC 297   11580 const_cast<thread_pool&>(*this)); 297   11580 const_cast<thread_pool&>(*this));
298   } 298   }
299   299  
300   void 300   void
HITCBC 301   347 thread_pool::executor_type:: 301   347 thread_pool::executor_type::
302   on_work_started() const noexcept 302   on_work_started() const noexcept
303   { 303   {
HITCBC 304   347 pool_->impl_->on_work_started(); 304   347 pool_->impl_->on_work_started();
HITCBC 305   347 } 305   347 }
306   306  
307   void 307   void
HITCBC 308   347 thread_pool::executor_type:: 308   347 thread_pool::executor_type::
309   on_work_finished() const noexcept 309   on_work_finished() const noexcept
310   { 310   {
HITCBC 311   347 pool_->impl_->on_work_finished(); 311   347 pool_->impl_->on_work_finished();
HITCBC 312   347 } 312   347 }
313   313  
314   void 314   void
HITCBC 315   18656 thread_pool::executor_type:: 315   19003 thread_pool::executor_type::
316   post(continuation& c) const 316   post(continuation& c) const
317   { 317   {
HITCBC 318   18656 pool_->impl_->post(c); 318   19003 pool_->impl_->post(c);
HITCBC 319   18656 } 319   19003 }
320   320  
321   std::coroutine_handle<> 321   std::coroutine_handle<>
HITCBC 322   355 thread_pool::executor_type:: 322   355 thread_pool::executor_type::
323   dispatch(continuation& c) const 323   dispatch(continuation& c) const
324   { 324   {
HITCBC 325   355 if(pool_->impl_->running_in_this_thread()) 325   355 if(pool_->impl_->running_in_this_thread())
HITCBC 326   13 return c.h; 326   13 return c.h;
HITCBC 327   342 pool_->impl_->post(c); 327   342 pool_->impl_->post(c);
HITCBC 328   342 return std::noop_coroutine(); 328   342 return std::noop_coroutine();
329   } 329   }
330   330  
331   } // capy 331   } // capy
332   } // boost 332   } // boost