29 #ifndef PIRANHA_THREAD_POOL_HPP 30 #define PIRANHA_THREAD_POOL_HPP 34 #include <boost/lexical_cast.hpp> 35 #include <condition_variable> 48 #include <type_traits> 49 #include <unordered_set> 53 #include <piranha/config.hpp> 54 #include <piranha/detail/atomic_lock_guard.hpp> 55 #include <piranha/detail/mpfr.hpp> 57 #include <piranha/mp_integer.hpp> 58 #include <piranha/runtime_info.hpp> 59 #include <piranha/thread_management.hpp> 71 task_queue(
unsigned n,
bool bind) : m_stop(false)
73 auto runner = [
this, n, bind]() {
84 std::unique_lock<std::mutex> lock(this->m_mutex);
85 while (!this->m_stop && this->m_tasks.empty()) {
89 this->m_cond.wait(lock);
91 if (this->m_stop && this->m_tasks.empty()) {
97 std::function<void()> task(std::move(this->m_tasks.front()));
115 m_thread = std::thread(std::move(runner));
128 template <
typename T>
132 template <
typename T>
133 struct unwrap_ref<
std::reference_wrapper<T>> {
136 template <
typename T>
137 using unwrap_ref_t =
typename unwrap_ref<T>::type;
144 template <
typename F,
typename... Args>
145 using f_ret_type = decltype(std::declval<decay_t<F> &>()(std::declval<unwrap_ref_t<uncvref_t<Args>> &>()...));
152 template <
typename F,
typename... Args>
154 = enable_if_t<conjunction<std::is_constructible<decay_t<F>, F>, std::is_constructible<uncvref_t<Args>, Args>...,
155 disjunction<std::is_copy_constructible<decay_t<F>>,
156 std::is_move_constructible<decay_t<F>>>,
157 conjunction<disjunction<std::is_copy_constructible<uncvref_t<Args>>,
158 std::is_move_constructible<uncvref_t<Args>>>>...,
159 is_returnable<f_ret_type<F, Args...>>>::value,
162 template <
typename F,
typename... Args, enabler<F &&, Args &&...> = 0>
163 std::future<f_ret_type<F &&, Args &&...>> enqueue(F &&f, Args &&... args)
165 using ret_type = f_ret_type<F &&, Args &&...>;
166 using p_task_type = std::packaged_task<ret_type()>;
171 auto task = std::make_shared<p_task_type>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
172 std::future<ret_type> res = task->get_future();
174 std::unique_lock<std::mutex> lock(m_mutex);
175 if (unlikely(m_stop)) {
177 piranha_throw(std::runtime_error,
"cannot enqueue task while the task queue is stopping");
179 m_tasks.push([task]() { (*task)(); });
190 std::unique_lock<std::mutex> lock(m_mutex);
204 std::condition_variable m_cond;
206 std::queue<std::function<void()>> m_tasks;
207 std::thread m_thread;
211 using thread_queues_t = std::pair<std::vector<std::unique_ptr<task_queue>>, std::unordered_set<std::thread::id>>;
213 inline thread_queues_t get_initial_thread_queues()
222 thread_queues_t retval;
225 retval.first.reserve(
static_cast<decltype(retval.first.size())
>(hc));
226 for (
unsigned i = 0u; i < hc; ++i) {
228 retval.first.emplace_back(::
new task_queue(i,
false));
231 for (
const auto &ptr : retval.first) {
232 auto p = retval.second.insert(ptr->m_thread.get_id());
234 piranha_assert(p.second);
239 template <
typename =
void>
240 struct thread_pool_base {
241 static thread_queues_t s_queues;
243 static std::atomic_flag s_atf;
246 template <
typename T>
247 thread_queues_t thread_pool_base<T>::s_queues = get_initial_thread_queues();
249 template <
typename T>
250 std::atomic_flag thread_pool_base<T>::s_atf = ATOMIC_FLAG_INIT;
252 template <
typename T>
253 bool thread_pool_base<T>::s_bind =
false;
256 void thread_pool_shutdown();
278 template <
typename T =
void>
281 friend void piranha::impl::thread_pool_shutdown<T>();
282 using base = thread_pool_base<>;
284 template <
typename Int>
285 using use_threads_enabler
286 = enable_if_t<disjunction<std::is_same<Int, integer>,
287 conjunction<std::is_integral<Int>, std::is_unsigned<Int>>>::value,
290 template <
typename F,
typename... Args>
291 using enqueue_t = decltype(std::declval<task_queue &>().
enqueue(std::declval<F>(), std::declval<Args>()...));
321 template <
typename F,
typename... Args>
322 static enqueue_t<F &&, Args &&...>
enqueue(
unsigned n, F &&f, Args &&... args)
324 detail::atomic_lock_guard lock(s_atf);
325 if (unlikely(n >= s_queues.first.size())) {
326 piranha_throw(std::invalid_argument,
"the thread index " + std::to_string(n)
327 +
" is out of range, the thread pool contains only " 328 + std::to_string(s_queues.first.size()) +
" threads");
330 return base::s_queues.first[
static_cast<decltype(base::s_queues.first.
size())
>(n)]->
enqueue(
331 std::forward<F>(f), std::forward<Args>(args)...);
339 detail::atomic_lock_guard lock(s_atf);
340 return static_cast<unsigned>(base::s_queues.first.size());
345 static thread_queues_t create_new_queues(
unsigned new_size,
bool bind)
347 thread_queues_t new_queues;
349 new_queues.first.reserve(
static_cast<decltype(new_queues.first.
size())
>(new_size));
350 for (
auto i = 0u; i < new_size; ++i) {
351 new_queues.first.emplace_back(::
new task_queue(i, bind));
354 for (
const auto &ptr : new_queues.first) {
355 auto p = new_queues.second.insert(ptr->m_thread.get_id());
357 piranha_assert(p.second);
362 static void shutdown()
364 thread_queues_t new_queues;
365 detail::atomic_lock_guard lock(s_atf);
366 new_queues.swap(base::s_queues);
385 if (unlikely(new_size == 0u)) {
386 piranha_throw(std::invalid_argument,
"cannot resize the thread pool to zero");
389 detail::atomic_lock_guard lock(s_atf);
390 auto new_queues = create_new_queues(new_size, base::s_bind);
398 new_queues.swap(base::s_queues);
418 detail::atomic_lock_guard lock(s_atf);
419 if (flag == base::s_bind) {
423 auto new_queues = create_new_queues(static_cast<unsigned>(base::s_queues.first.size()), flag);
424 new_queues.swap(base::s_queues);
436 detail::atomic_lock_guard lock(s_atf);
459 template <
typename Int, use_threads_enabler<Int> = 0>
460 static unsigned use_threads(
const Int &work_size,
const Int &min_work_per_thread)
463 if (unlikely(work_size <= Int(0))) {
464 piranha_throw(std::invalid_argument,
"invalid value of " + boost::lexical_cast<std::string>(work_size)
465 +
" for work size (it must be strictly positive)");
467 if (unlikely(min_work_per_thread <= Int(0))) {
469 + boost::lexical_cast<std::string>(min_work_per_thread)
470 +
" for minimum work per thread (it must be strictly positive)");
472 detail::atomic_lock_guard lock(s_atf);
474 if (base::s_queues.second.find(std::this_thread::get_id()) != base::s_queues.second.end()) {
477 const auto n_threads =
static_cast<unsigned>(base::s_queues.first.size());
478 piranha_assert(n_threads);
479 if (work_size / n_threads >= min_work_per_thread) {
485 return static_cast<unsigned>(std::max(Int(1), static_cast<Int>(work_size / min_work_per_thread)));
495 inline namespace impl
499 inline void thread_pool_shutdown()
501 thread_pool::shutdown();
512 template <
typename T>
516 static void wait_or_abort(
const std::future<T> &fut)
518 piranha_assert(fut.valid());
565 m_list.emplace_back();
576 m_list.back() = std::move(f);
584 for (
auto &f : m_list) {
599 for (
auto &f : m_list) {
608 std::list<std::future<T>> m_list;
void wait_all()
Wait on all the futures.
void bind_to_proc(unsigned n)
Bind thread to specific processor.
static unsigned use_threads(const Int &work_size, const Int &min_work_per_thread)
Compute number of threads to use.
void get_all()
Get all the futures.
static enqueue_t< F &&, Args &&... > enqueue(unsigned n, F &&f, Args &&... args)
Enqueue task.
#define piranha_throw(exception_type,...)
Exception-throwing macro.
future_list()=default
Defaulted default constructor.
Class to store a list of futures.
static unsigned size()
Size.
~future_list()
Destructor.
static unsigned get_hardware_concurrency()
Hardware concurrency.
static void set_binding(bool flag)
Set the thread binding policy.
static void resize(unsigned new_size)
Change the number of threads.
static bool get_binding()
Get the thread binding policy.
void push_back(std::future< T > &&f)
Move-insert a future.