LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 89.7 % 78 70
Test Date: 2026-01-30 23:43:15 Functions: 87.5 % 16 14

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : // Copyright (c) 2026 Michael Vandeberg
       4              : //
       5              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7              : //
       8              : // Official repository: https://github.com/boostorg/capy
       9              : //
      10              : 
      11              : #include <boost/capy/ex/thread_pool.hpp>
      12              : #include <boost/capy/detail/intrusive.hpp>
      13              : #include <boost/capy/detail/thread_name.hpp>
      14              : #include <atomic>
      15              : #include <condition_variable>
      16              : #include <cstdio>
      17              : #include <mutex>
      18              : #include <thread>
      19              : #include <vector>
      20              : 
      21              : /*
      22              :     Thread pool implementation using a shared work queue.
      23              : 
      24              :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      25              :     in a single queue protected by a mutex. Worker threads wait on a
      26              :     condition_variable until work is available or stop is requested.
      27              : 
      28              :     Threads are started lazily on first post() via std::call_once to avoid
      29              :     spawning threads for pools that are constructed but never used. Each
      30              :     thread is named with a configurable prefix plus index for debugger
      31              :     visibility.
      32              : 
      33              :     Shutdown sequence: stop() sets the stop flag and notifies all threads,
      34              :     then the destructor joins threads and destroys any remaining queued
      35              :     work without executing it.
      36              : */
      37              : 
      38              : namespace boost {
      39              : namespace capy {
      40              : 
      41              : //------------------------------------------------------------------------------
      42              : 
      43              : class thread_pool::impl
      44              : {
      45              :     struct work : detail::intrusive_queue<work>::node
      46              :     {
      47              :         coro h_;
      48              : 
      49          121 :         explicit work(coro h) noexcept
      50          121 :             : h_(h)
      51              :         {
      52          121 :         }
      53              : 
      54          121 :         void run()
      55              :         {
      56          121 :             auto h = h_;
      57          121 :             delete this;
      58          121 :             h.resume();
      59          121 :         }
      60              : 
      61            0 :         void destroy()
      62              :         {
      63            0 :             delete this;
      64            0 :         }
      65              :     };
      66              : 
      67              :     std::mutex mutex_;
      68              :     std::condition_variable cv_;
      69              :     detail::intrusive_queue<work> q_;
      70              :     std::vector<std::thread> threads_;
      71              :     std::atomic<bool> stop_{false};
      72              :     std::size_t num_threads_;
      73              :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      74              :     std::once_flag start_flag_;
      75              : 
      76              : public:
      77           58 :     ~impl()
      78              :     {
      79           58 :         stop();
      80           94 :         for(auto& t : threads_)
      81           36 :             if(t.joinable())
      82           36 :                 t.join();
      83              : 
      84           58 :         while(auto* w = q_.pop())
      85            0 :             w->destroy();
      86           58 :     }
      87              : 
      88           58 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      89           58 :         : num_threads_(num_threads)
      90              :     {
      91           58 :         if(num_threads_ == 0)
      92            2 :             num_threads_ = std::thread::hardware_concurrency();
      93           58 :         if(num_threads_ == 0)
      94            0 :             num_threads_ = 1;
      95              : 
      96              :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
      97           58 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
      98           58 :         thread_name_prefix_[n] = '\0';
      99           58 :     }
     100              : 
     101              :     void
     102          121 :     post(coro h)
     103              :     {
     104          121 :         ensure_started();
     105          121 :         auto* w = new work(h);
     106              :         {
     107          121 :             std::lock_guard<std::mutex> lock(mutex_);
     108          121 :             q_.push(w);
     109          121 :         }
     110          121 :         cv_.notify_one();
     111          121 :     }
     112              : 
     113              :     void
     114           58 :     stop() noexcept
     115              :     {
     116           58 :         stop_.store(true, std::memory_order_release);
     117           58 :         cv_.notify_all();
     118           58 :     }
     119              : 
     120              : private:
     121              :     void
     122          121 :     ensure_started()
     123              :     {
     124          121 :         std::call_once(start_flag_, [this]{
     125           20 :             threads_.reserve(num_threads_);
     126           56 :             for(std::size_t i = 0; i < num_threads_; ++i)
     127           72 :                 threads_.emplace_back([this, i]{ run(i); });
     128           20 :         });
     129          121 :     }
     130              : 
     131              :     void
     132           36 :     run(std::size_t index)
     133              :     {
     134              :         // Build name; set_current_thread_name truncates to platform limits.
     135              :         char name[16];
     136           36 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     137           36 :         detail::set_current_thread_name(name);
     138              : 
     139              :         for(;;)
     140              :         {
     141          157 :             work* w = nullptr;
     142              :             {
     143          157 :                 std::unique_lock<std::mutex> lock(mutex_);
     144          157 :                 cv_.wait(lock, [this]{
     145          289 :                     return !q_.empty() ||
     146          289 :                         stop_.load(std::memory_order_acquire);
     147              :                 });
     148          157 :                 if(stop_.load(std::memory_order_acquire) && q_.empty())
     149           72 :                     return;
     150          121 :                 w = q_.pop();
     151          157 :             }
     152          121 :             if(w)
     153          121 :                 w->run();
     154          121 :         }
     155              :     }
     156              : };
     157              : 
     158              : //------------------------------------------------------------------------------
     159              : 
     160           58 : thread_pool::
     161              : ~thread_pool()
     162              : {
     163           58 :     shutdown();
     164           58 :     destroy();
     165           58 :     delete impl_;
     166           58 : }
     167              : 
     168           58 : thread_pool::
     169           58 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     170           58 :     : impl_(new impl(num_threads, thread_name_prefix))
     171              : {
     172           58 :     this->set_frame_allocator(std::allocator<void>{});
     173           58 : }
     174              : 
     175              : void
     176            0 : thread_pool::
     177              : stop() noexcept
     178              : {
     179            0 :     impl_->stop();
     180            0 : }
     181              : 
     182              : //------------------------------------------------------------------------------
     183              : 
     184              : void
     185          121 : thread_pool::executor_type::
     186              : post(coro h) const
     187              : {
     188          121 :     pool_->impl_->post(h);
     189          121 : }
     190              : 
     191              : } // capy
     192              : } // boost
        

Generated by: LCOV version 2.3