root/unix/pool_thread_impl.hpp

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. batch
  2. steal
  3. add_work_to_batch
  4. process_a_batch_item
  5. process
  6. statistics_
  7. slave
  8. process
  9. batch
  10. steal
  11. add_work_to_batch
  12. process_a_batch_item
  13. process
  14. work
  15. work
  16. steal
  17. push_front
  18. process_a_batch_item
  19. process
  20. statistics_
  21. slave
  22. process

   1 /******************************************************************************
   2 ** $Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/unix/pool_thread_impl.hpp 2251 2018-02-20 23:30:57Z jmmcg $
   3 **
   4 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
   5 **
   6 ** This library is free software; you can redistribute it and/or
   7 ** modify it under the terms of the GNU Lesser General Public
   8 ** License as published by the Free Software Foundation; either
   9 ** version 2.1 of the License, or (at your option) any later version.
  10 **
  11 ** This library is distributed in the hope that it will be useful,
  12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14 ** Lesser General Public License for more details.
  15 **
  16 ** You should have received a copy of the GNU Lesser General Public
  17 ** License along with this library; if not, write to the Free Software
  18 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  19 */
  20 
  21 namespace jmmcg { namespace ppd { namespace pool { namespace private_ { namespace thread_types {
  22 
  23 template<class PTT> inline
  24 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  25 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
  26 : base_t(exit_r), batch(work_q) {
  27 }
  28 
  29 template<class PTT> inline
  30 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  31 ~steal() noexcept(false) {
  32         this->delete_thread();
  33         assert(batch.batch.batch_empty());
  34 }
  35 
  36 template<class PTT> inline
  37 bool
  38 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  39 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
  40         return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
  41 }
  42 
  43 template<class PTT> inline
  44 bool
  45 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  46 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
  47         return batch.batch.process_a_batch_item();
  48 }
  49 
  50 template<class PTT> inline
  51 typename steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::thread_traits::api_params_type::states
  52 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  53 process() noexcept(false) {
  54         // Serialize access to the queue by all of the other threads in the pool, this means that only one job can be removed at a time from the work queue.
  55         for (;;) {
  56                 typename exit_requested_type::lock_result_type lkd;
  57                 {
  58                         const typename os_traits::thread_traits::cancellability set;
  59                         lkd=this->exit_requested_.lock();
  60                 }
  61                 if (lkd.second!=lock_traits::atom_set || lkd.first==exit_requested_type::states::exit_requested) {
  62                         // Ensure the rest of the threads in the pool exit.
  63                         this->exit_requested_.set(exit_requested_type::states::exit_requested);
  64                         break;
  65                 } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
  66                         // The counter in signalled_work_queue is one item too low, and will need to be re-added, but we'll do that closer to where we lock the queue, to reduce the chance of a horizontal thread slipping in and getting the work.
  67                         batch.batch.process_a_batch(batch.signalled_work_queue);
  68                         assert(batch.batch.batch_empty());
  69                 }
  70         }
  71         return thread_traits::api_params_type::no_kernel_thread;
  72 }
  73 
  74 template<class PTT> inline
  75 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
  76 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
  77 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
  78 }
  79 
  80 template<class PTT> inline
  81 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
  82 ~slave() noexcept(true) {
  83         this->delete_thread();
  84 }
  85 
  86 template<class PTT> inline
  87 typename slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
  88 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
  89 process() noexcept(false) {
  90         some_work->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
  91         this->statistics_.processed_vertical_work();
  92         return thread_traits::api_params_type::no_kernel_thread;
  93 }
  94 
  95 template<class PTT> inline
  96 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
  97 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
  98 : base_t(exit_r), batch(work_q) {
  99 }
 100 
 101 template<class PTT> inline
 102 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
 103 ~steal() noexcept(false) {
 104         this->delete_thread();
 105         assert(batch.batch.batch_empty());
 106 }
 107 
 108 template<class PTT> inline
 109 bool
 110 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
 111 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
 112         return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
 113 }
 114 
 115 template<class PTT> inline
 116 bool
 117 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
 118 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
 119         return batch.batch.process_a_batch_item();
 120 }
 121 
 122 template<class PTT> inline
 123 typename steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::thread_traits::api_params_type::states
 124 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
 125 process() noexcept(false) {
 126         // Serialize access to the queue by all of the other threads in the pool, this means that only one job can be removed at a time from the work queue.
 127         for (;;) {
 128                 typename exit_requested_type::lock_result_type lkd;
 129                 {
 130                         const typename os_traits::thread_traits::cancellability set;
 131                         lkd=this->exit_requested_.lock();
 132                 }
 133                 assert(lkd.second==lock_traits::atom_set);
 134                 if (lkd.first==exit_requested_type::states::exit_requested) {
 135                         // Ensure the rest of the threads in the pool exit.
 136                         this->exit_requested_.set(exit_requested_type::states::exit_requested);
 137                         break;
 138                 } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
 139                         // The counter in signalled_work_queue is one item too low, and will need to be re-added, but we'll do that closer to where we lock the queue, to reduce the chance of a horizontal thread slipping in and getting the work.
 140                         batch.batch.process_a_batch(batch.signalled_work_queue);
 141                         assert(batch.batch.batch_empty());
 142                 }
 143         }
 144         return thread_traits::api_params_type::no_kernel_thread;
 145 }
 146 
 147 template<class PTT> inline
 148 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 149 steal(exit_requested_type &exit_r) noexcept(true)
 150 : base_t(exit_r), work() {
 151 }
 152 
 153 template<class PTT> inline
 154 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 155 steal(steal &&s) noexcept(true)
 156 : base_t(s.exit_requested_), work(std::move(s.work)) {
 157 }
 158 
 159 template<class PTT> inline
 160 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 161 ~steal() noexcept(false) {
 162         this->delete_thread();
 163 }
 164 
 165 template<class PTT> inline bool
 166 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 167 push_front(typename signalled_work_queue_type::value_type &&wk) {
 168         work.batch.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
 169         return true;
 170 }
 171 
 172 template<class PTT> inline bool
 173 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 174 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thread) {
 175 // TODO
 176 }
 177 
 178 template<class PTT> inline
 179 typename steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::thread_traits::api_params_type::states
 180 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
 181 process() noexcept(false) {
 182         using cfg_type=typename container_type::container_type::value_type::value_type::cfg_type;
 183 
 184         for (;;) {
 185                 // Busy wait for work or exit flag....
 186                 while (work.batch.empty()) {
 187                         const typename exit_requested_type::lock_result_type lkd=this->exit_requested_.try_lock();
 188                         assert(lkd.second==lock_traits::atom_set);
 189                         if (lkd.first==exit_requested_type::states::exit_requested) {
 190                                 // Ensure the rest of the threads in the pool exit.
 191                                 this->exit_requested_.set(exit_requested_type::states::exit_requested);
 192                                 return thread_traits::api_params_type::no_kernel_thread;
 193                         }
 194                 }
 195                 typename container_type::container_type::value_type current_work(work.batch.pop_front_1_nochk_nosig());
 196                 ppd::private_::eval_shared_deleter_t<typename container_type::container_type::value_type> wk(current_work);
 197                 wk.process_the_work(std::bind(&statistics_type::processed_vertical_work, &work.statistics_), cfg_type::vertical_edge_annotation);
 198         }
 199         return thread_traits::api_params_type::no_kernel_thread;
 200 }
 201 
 202 template<class PTT> inline
 203 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
 204 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
 205 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
 206 }
 207 
 208 template<class PTT> inline
 209 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
 210 ~slave() noexcept(true) {
 211         this->delete_thread();
 212 }
 213 
 214 template<class PTT> inline
 215 typename slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
 216 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
 217 process() noexcept(false) {
 218         class set_work_complete {
 219         public:
 220                 explicit set_work_complete(typename signalled_work_queue_type::value_type::atomic_ptr_t &wc) noexcept(true)
 221                 : work_ptr(wc) {
 222                 }
 223                 ~set_work_complete() noexcept(true) {
 224                         work_ptr->work_complete()->set();
 225                 }
 226 
 227         private:
 228                 typename signalled_work_queue_type::value_type::atomic_ptr_t &work_ptr;
 229         };
 230 
 231         typename signalled_work_queue_type::value_type::atomic_ptr_t work_ptr(some_work.get());
 232         if (dynamic_cast<typename signalled_work_queue_type::value_type::no_ref_counting *>(work_ptr.get())) {
 233                 some_work=typename signalled_work_queue_type::value_type();
 234         }
 235         if (work_ptr->result_traits()==generic_traits::return_data::joinable) {
 236                 const set_work_complete setter(work_ptr);
 237                 work_ptr->process_joinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
 238         } else {
 239                 work_ptr->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
 240         }
 241         this->statistics_.processed_vertical_work();
 242         return thread_traits::api_params_type::no_kernel_thread;
 243 }
 244 
 245 } } } } }

/* [<][>][^][v][top][bottom][index][help] */