root/core/private_/thread_pool_queue_model.hpp

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. process_a_batch_item
  2. add_work_to_batch
  3. pool_empty
  4. queue_empty
  5. pool
  6. add_work_to_batch
  7. pool_empty
  8. pool
  9. add_work_to_batch
  10. queue_empty

   1 #ifndef libjmmcg_core_thread_pool_queue_model_hpp
   2 #define libjmmcg_core_thread_pool_queue_model_hpp
   3 
   4 /******************************************************************************
   5 ** $Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/private_/thread_pool_queue_model.hpp 2251 2018-02-20 23:30:57Z jmmcg $
   6 **
   7 ** Copyright (c) 2010 by J.M.McGuiness, coder@hussar.me.uk
   8 **
   9 ** This library is free software; you can redistribute it and/or
  10 ** modify it under the terms of the GNU Lesser General Public
  11 ** License as published by the Free Software Foundation; either
  12 ** version 2.1 of the License, or (at your option) any later version.
  13 **
  14 ** This library is distributed in the hope that it will be useful,
  15 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17 ** Lesser General Public License for more details.
  18 **
  19 ** You should have received a copy of the GNU Lesser General Public
  20 ** License along with this library; if not, write to the Free Software
  21 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  22 */
  23 
  24 #include "thread_pool_base.hpp"
  25 
  26 namespace jmmcg { namespace ppd { namespace private_ {
  27 
  28 /// This is the batch that the main thread will process.
  29 template<
  30         unsigned long GSSkSz,
  31         class PTT,
  32         class Pt,
  33         class QM
  34 >
  35 class GSSk_batching {
  36 public:
  37         typedef PTT pool_traits_type;
  38         typedef typename pool_traits_type::os_traits os_traits;
  39         typedef Pt pool_type;
  40         typedef typename os_traits::thread_traits thread_traits;
  41         using signalled_work_queue_type=typename pool_traits_type::template signalled_work_queue_type<QM>;
  42         /// Return a container of GSSkSz items from the front of the queue to implement the GSS(k) or bakers' scheduling algorithm.
  43         using batch_details_type=batch_details<pool_traits_type::GSSk, signalled_work_queue_type, typename remove_shared_ptr<typename pool_type::value_type, api_lock_traits<platform_api, sequential_mode>>::value_type::statistics_type>;
  44         typedef typename batch_details_type::statistics_type statistics_type;
  45         static constexpr unsigned long GSSk=batch_details_type::GSSk;
  46 
  47         explicit GSSk_batching(const typename thread_traits::api_params_type::tid_type mtid) noexcept(true) FORCE_INLINE
  48         : main_tid(mtid) {
  49         }
  50 
  51         /// Process an closure_base-derived closure item from the batch of a pool_thread.
  52         /**
  53                 \param  pool    The thread pool.
  54                 \param  tid     The thread_id of the pool_thread to query the batch for more work.
  55                 \return true if there is more closure_base-derived closure to process() in the pool_thread's batch, otherwise false.
  56 
  57                 \see batch_details::process_a_batch_item()
  58         */
  59         bool __fastcall process_a_batch_item(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename os_traits::thread_exception const &exception_thrown_in_thread) noexcept(false) FORCE_INLINE {
  60                 if (tid==main_tid) {
  61                         return batch.process_a_batch_item();
  62                 } else {
  63                         const typename pool_type::container_type::iterator thread=pool.find(tid);
  64                                 if (thread!=pool.end()) {
  65                                 assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
  66                                 return thread->second->process_a_batch_item(exception_thrown_in_thread);
  67                         } else {
  68                                 // We might have a horizontal thread spawned by a horizontal thread, so the ancestor_thread_id will no longer be in that of a pool_thread in the thread_pool. But this feature is used to flush the batch of a pool_thread of any more work, but horizontal threads only have one item in their batch, the active closure_base-derived closure, i.e. no backed-up work, so just return that all work have been done.
  69                                 return false;
  70                         }
  71                 }
  72         }
  73 
  74         /// Put the closure_base-derived closure in the batch, if it is empty.
  75         /**
  76                 Note that this function runs with no locks, as it presumes that the caller is the same pool_thread that consumes the work from the batch.
  77 
  78                 \param pool     The thread_pool_base-derived thread pool to which the wk will be transferred.
  79                 \param tid      The thread_id of the pool_thread to which the closure_base-derived closure should be added, if possible.
  80                 \param wk       The closure_base-derived closure to attempt to add.
  81                 \return true if the closure_base-derived closure was added, false otherwise.
  82 
  83                 \see batch_details::add_work_to_batch()
  84         */
  85         bool __fastcall add_work_to_batch(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
  86                 if (tid==main_tid) {
  87                         return batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
  88                 } else {
  89                         const auto thread=pool.find(tid);
  90                         if (thread!=pool.end()) {
  91                                 assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
  92                                 return thread->second->add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
  93                         }
  94                 }
  95                 // We might have a horizontal thread spawned by a horizontal thread, so the ancestor_thread_id will no longer be that of a pool_thread in the thread_pool. But horizontal threads only have one item in their batch, the active closure_base-derived closure, i.e. no backed-up work, so just return that the work couldn't be added.
  96                 return false;
  97         }
  98 
  99         statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
 100                 return batch.statistics();
 101         }
 102         statistics_type &__fastcall statistics() noexcept(true) FORCE_INLINE {
 103                 return batch.statistics();
 104         }
 105 
 106 private:
 107         batch_details_type batch;
 108         const typename thread_traits::api_params_type::tid_type main_tid;
 109 };
 110 
 111 /// Implements the specifics of how the queue(s) within the thread_pool or pool_threads are implemented.
 112 /**
 113         \see thread_pool_base
 114 */
 115 template<
 116         class DM,
 117         pool_traits::size_mode_t Ps,
 118         typename PTT,
 119         class Pt
 120 >
 121 class thread_pool_queue_model;
 122 
 123 /// Implements the case when the signalled_work_queue is contained within the thread_pool & shared by the pool_threads.
 124 /**
 125         This implies that the cost of executing the input_work is larger than the locking & serialisation cost that having a single queue from which all of the pool_threads compete to steal work from.
 126 
 127         \see thread_pool_base, signalled_work_queue
 128 */
 129 template<
 130         template<class> class QM,
 131         pool_traits::size_mode_t Ps,
 132         typename PTT,
 133         class Pt
 134 >
 135 class thread_pool_queue_model<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt> : public thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue> {
 136 public:
 137         using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>;
 138         using base1_t=thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>;
 139         using pool_traits_type=typename base1_t::pool_traits_type;
 140         using os_traits=typename base1_t::os_traits;
 141         using pool_type=typename base1_t::pool_type;
 142         using queue_size_type=typename base1_t::queue_size_type;
 143         using pool_size_type=typename base1_t::pool_size_type;
 144         using pool_thread_type=typename base1_t::pool_thread_type;
 145         using exception_type=typename base1_t::exception_type;
 146         using thread_traits=typename base1_t::thread_traits;
 147         using api_params_type=typename base1_t::api_params_type;
 148         using priority_type=typename base1_t::priority_type;
 149         using work_distribution_mode=typename base1_t::work_distribution_mode;
 150         using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
 151         using queue_model=typename base_t::queue_model;
 152 
 153         /**
 154                 To assist in allowing compile-time computation of the algorithmic order of the threading model.
 155         */
 156         static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
 157 
 158         using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
 159         /// The type of statistics collected related to the operation of the thread_pool.
 160         /**
 161                 The general concept behind this type is that the cost of gathering the statistics should be as small as possible, even to the extent to the statistics being inaccurate under-estimations, to ensure the cost is minimised.
 162 
 163                 \see no_statistics
 164                 \see basic_statistics
 165         */
 166         using statistics_type=typename base1_t::statistics_type;
 167         using cfg_type=typename base1_t::cfg_type;
 168 
 169         /// Returns true if there no threads in the thread_pool.
 170         /**
 171                 \return true if there no threads in the thread_pool.
 172         */
 173         bool __fastcall pool_empty() const noexcept(true) FORCE_INLINE {
 174                 return pool.empty();
 175         }
 176         /// Returns the current number of threads in the thread_pool.
 177         /**
 178                 \return The current number of threads in the thread_pool.
 179         */
 180         const pool_size_type __fastcall pool_size() const noexcept(true) FORCE_INLINE final {
 181                 return pool.size();
 182         }
 183         /**
 184                 \return true if there is no input_work to process by the thread_pool.
 185         */
 186         bool __fastcall queue_empty() const noexcept(true) FORCE_INLINE {
 187                 return this->signalled_work_queue.empty();
 188         }
 189         /**
 190                 \return The current amount of outstanding, unscheduled input_work items to be processed by the thread_pool.
 191         */
 192         const queue_size_type __fastcall queue_size() const noexcept(true) FORCE_INLINE {
 193                 return this->signalled_work_queue.size();
 194         }
 195 
 196         /// Return the theoretical minimum time in computations according to section 3.3 & Theorem 3.3 in [1] required to complete the current work with the current number of threads in the pool using a CREW-PRAM and according to section 1.3.2, Theorem 1.2 in [2] for an EREW-PRAM.
 197         /**
 198                 The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
 199 
 200                 [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
 201                 [2] Casanova, H., Legrand, A., Robert, Y., "Parallel Algorithms", CRC Press, 2008.
 202 
 203                 \return The minimum number of computations
 204 
 205                 \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
 206 
 207                 \see safe_colln
 208         */
 209         unsigned long __fastcall
 210         min_time(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
 211         template<class T>
 212         unsigned long __fastcall FORCE_INLINE
 213         min_time(T const &) const noexcept(true);
 214 
 215         /// Return the theoretical minimum number of processors required to achieve the minimum computation time according to section 3.3 & Theorem 3.3 in [1] required to complete the current work using a CREW-PRAM.
 216         /**
 217                 The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
 218 
 219                 [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
 220 
 221                 \return The minimum number of processors
 222 
 223                 \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
 224 
 225                 \see safe_colln
 226         */
 227         unsigned long __fastcall
 228         min_processors(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
 229         template<class T>
 230         unsigned long __fastcall FORCE_INLINE
 231         min_processors(T const &) const noexcept(true);
 232 
 233 protected:
 234         pool_type pool;
 235 
 236 // TODO BOOST_MPL_ASSERT((std::is_same<typename base_t::exit_requested_type, typename pool_type::have_work_type::atomic_t>));
 237 
 238         __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
 239         : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_, this->signalled_work_queue) {
 240         }
 241 
 242         queue_size_type __fastcall
 243         batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
 244 
 245         signalled_work_queue_type & __fastcall queue() noexcept(true) FORCE_INLINE {
 246                 return this->signalled_work_queue;
 247         }
 248         signalled_work_queue_type const & __fastcall queue() const noexcept(true) FORCE_INLINE final {
 249                 return this->signalled_work_queue;
 250         }
 251         /**
 252                 \param  wk      closure_base-derived closure to be process()ed by a pool_thread.
 253                 \return True if the closure_base-derived closure was added to the internal batch_details of the specified pool_thread.
 254         */
 255         virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
 256                 return false;
 257         }
 258 
 259         typename base_t::exit_requested_type &exit_requested() noexcept(true) FORCE_INLINE {
 260                 return this->exit_requested_;
 261         }
 262 
 263 private:
 264         template<class TPB> friend class joinable_t;
 265         template<class TPB> friend class nonjoinable_t;
 266         template<class TPB> friend class nonjoinable_buff_t;
 267         template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
 268         template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
 269         template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
 270         template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
 271 };
 272 
 273 /// Implements the case when there is a signalled_work_queue contained within each pool_thread, and an algorithm is used to steal work from the pool_thread by other pool_threads.
 274 /**
 275         This implies that the cost of executing the input_work is similar to the cost of the locking & serialisation costs of that input_work.
 276 
 277         \see thread_pool_base, signalled_work_queue
 278 */
 279 template<
 280         pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t SM,
 281         pool_traits::size_mode_t Ps,
 282         typename PTT,
 283         class Pt
 284 >
 285 class thread_pool_queue_model<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<SM>>, Ps, PTT, Pt> : public thread_pool_base<pool_traits::work_distribution_mode_t::template worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>> {
 286 public:
 287         using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>;
 288         using base1_t=thread_pool_base<pool_traits::work_distribution_mode_t::template worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>, Ps, PTT, Pt>;
 289         using pool_traits_type=typename base1_t::pool_traits_type;
 290         using os_traits=typename base1_t::os_traits;
 291         using pool_type=typename base1_t::pool_type;
 292         using queue_size_type=typename base1_t::queue_size_type;
 293         using pool_size_type=typename base1_t::pool_size_type;
 294         using pool_thread_type=typename base1_t::pool_thread_type;
 295         using exception_type=typename base1_t::exception_type;
 296         using thread_traits=typename base1_t::thread_traits;
 297         using api_params_type=typename base1_t::api_params_type;
 298         using priority_type=typename base1_t::priority_type;
 299         using work_distribution_mode=typename base1_t::work_distribution_mode;
 300         using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
 301         using queue_model=typename base_t::queue_model;
 302 
 303         /**
 304                 To assist in allowing compile-time computation of the algorithmic order of the threading model.
 305         */
 306         static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
 307 
 308         /// GSS(k) batching is not supported.
 309         BOOST_MPL_ASSERT((std::is_same<std::integral_constant<unsigned long, pool_traits_type::GSSk>, std::integral_constant<unsigned long, 1UL>>));
 310 
 311         using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
 312         /// The type of statistics collected related to the operation of the thread_pool.
 313         /**
 314                 The general concept behind this type is that the cost of gathering the statistics should be as small as possible, even to the extent to the statistics being inaccurate under-estimations, to ensure the cost is minimised.
 315 
 316                 \see no_statistics
 317                 \see basic_statistics
 318         */
 319         using statistics_type=typename base1_t::statistics_type;
 320         using cfg_type=typename base1_t::cfg_type;
 321 
 322         /// Returns true if there no threads in the thread_pool.
 323         /**
 324                 \return true if there no threads in the thread_pool.
 325         */
 326         bool __fastcall pool_empty() const noexcept(true) FORCE_INLINE {
 327                 return pool.empty();
 328         }
 329         /// Returns the current number of threads in the thread_pool.
 330         /**
 331                 \return The current number of threads in the thread_pool.
 332         */
 333         const pool_size_type __fastcall pool_size() const noexcept(true) FORCE_INLINE {
 334                 return pool.size();
 335         }
 336 
 337         /// Return the theoretical minimum time in computations according to section 3.3 & Theorem 3.3 in [1] required to complete the current work with the current number of threads in the pool using a CREW-PRAM and according to section 1.3.2, Theorem 1.2 in [2] for an EREW-PRAM.
 338         /**
 339                 The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
 340 
 341                 [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
 342                 [2] Casanova, H., Legrand, A., Robert, Y., "Parallel Algorithms", CRC Press, 2008.
 343 
 344                 \return The minimum number of computations
 345 
 346                 \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
 347 
 348                 \see safe_colln
 349         */
 350         unsigned long __fastcall
 351         min_time(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
 352         template<class T>
 353         unsigned long __fastcall FORCE_INLINE
 354         min_time(T const &) const noexcept(true);
 355 
 356         /// Return the theoretical minimum number of processors required to achieve the minimum computation time according to section 3.3 & Theorem 3.3 in [1] required to complete the current work using a CREW-PRAM.
 357         /**
 358                 The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
 359 
 360                 [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
 361 
 362                 \return The minimum number of processors
 363 
 364                 \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
 365 
 366                 \see safe_colln
 367         */
 368         unsigned long __fastcall
 369         min_processors(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
 370         template<class T>
 371         unsigned long __fastcall FORCE_INLINE
 372         min_processors(T const &) const noexcept(true);
 373 
 374 protected:
 375         pool_type pool;
 376 
 377         BOOST_MPL_ASSERT((std::is_same<typename base_t::exit_requested_type, typename pool_type::exit_requested_type>));
 378 
 379         __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
 380         : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_) {
 381         }
 382 
 383         queue_size_type __fastcall
 384         batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
 385 
 386         /**
 387                 \param  wk      closure_base-derived closure to be process()ed by a pool_thread.
 388                 \return True if the closure_base-derived closure was added to the internal batch_details of the specified pool_thread.
 389         */
 390         virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
 391                 return false;
 392         }
 393 
 394         typename base_t::exit_requested_type &exit_requested() noexcept(true) FORCE_INLINE {
 395                 return this->exit_requested_;
 396         }
 397 
 398 private:
 399         template<class TPB> friend class joinable_t;
 400         template<class TPB> friend class nonjoinable_t;
 401         template<class TPB> friend class nonjoinable_buff_t;
 402         template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
 403         template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
 404         template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
 405         template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
 406 
 407         signalled_work_queue_type & __fastcall queue() noexcept(true) FORCE_INLINE {}
 408         signalled_work_queue_type const & __fastcall queue() const noexcept(true) FORCE_INLINE {}
 409         /**
 410                 \return true if there is no input_work to process by the pool_threads.
 411         */
 412         bool __fastcall queue_empty() const noexcept(true) FORCE_INLINE {}
 413         /**
 414                 \return The approximate amount of outstanding, unscheduled input_work items to be processed by the pool_threads.
 415         */
 416         const queue_size_type __fastcall queue_size() const noexcept(true) FORCE_INLINE {}
 417 };
 418 
 419 } } }
 420 
 421 #include "thread_pool_queue_model_impl.hpp"
 422 
 423 #endif

/* [<][>][^][v][top][bottom][index][help] */