This source file includes following definitions.
- batch
- steal
- add_work_to_batch
- process_a_batch_item
- process
- statistics_
- slave
- process
- batch
- steal
- add_work_to_batch
- process_a_batch_item
- process
- work
- work
- steal
- push_front
- process_a_batch_item
- process
- statistics_
- slave
- process
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 namespace jmmcg { namespace ppd { namespace pool { namespace private_ { namespace thread_types {
22
23 template<class PTT> inline
24 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
25 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
26 : base_t(exit_r), batch(work_q) {
27 }
28
29 template<class PTT> inline
30 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
31 ~steal() noexcept(false) {
32 this->delete_thread();
33 assert(batch.batch.batch_empty());
34 }
35
36 template<class PTT> inline
37 bool
38 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
39 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
40 return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
41 }
42
43 template<class PTT> inline
44 bool
45 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
46 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
47 return batch.batch.process_a_batch_item();
48 }
49
50 template<class PTT> inline
51 typename steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::thread_traits::api_params_type::states
52 steal<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
53 process() noexcept(false) {
54
55 for (;;) {
56 typename exit_requested_type::lock_result_type lkd;
57 {
58 const typename os_traits::thread_traits::cancellability set;
59 lkd=this->exit_requested_.lock();
60 }
61 if (lkd.second!=lock_traits::atom_set || lkd.first==exit_requested_type::states::exit_requested) {
62
63 this->exit_requested_.set(exit_requested_type::states::exit_requested);
64 break;
65 } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
66
67 batch.batch.process_a_batch(batch.signalled_work_queue);
68 assert(batch.batch.batch_empty());
69 }
70 }
71 return thread_traits::api_params_type::no_kernel_thread;
72 }
73
74 template<class PTT> inline
75 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
76 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
77 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
78 }
79
80 template<class PTT> inline
81 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
82 ~slave() noexcept(true) {
83 this->delete_thread();
84 }
85
86 template<class PTT> inline
87 typename slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
88 slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
89 process() noexcept(false) {
90 some_work->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
91 this->statistics_.processed_vertical_work();
92 return thread_traits::api_params_type::no_kernel_thread;
93 }
94
95 template<class PTT> inline
96 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
97 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
98 : base_t(exit_r), batch(work_q) {
99 }
100
101 template<class PTT> inline
102 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
103 ~steal() noexcept(false) {
104 this->delete_thread();
105 assert(batch.batch.batch_empty());
106 }
107
108 template<class PTT> inline
109 bool
110 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
111 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
112 return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
113 }
114
115 template<class PTT> inline
116 bool
117 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
118 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
119 return batch.batch.process_a_batch_item();
120 }
121
122 template<class PTT> inline
123 typename steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::thread_traits::api_params_type::states
124 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::
125 process() noexcept(false) {
126
127 for (;;) {
128 typename exit_requested_type::lock_result_type lkd;
129 {
130 const typename os_traits::thread_traits::cancellability set;
131 lkd=this->exit_requested_.lock();
132 }
133 assert(lkd.second==lock_traits::atom_set);
134 if (lkd.first==exit_requested_type::states::exit_requested) {
135
136 this->exit_requested_.set(exit_requested_type::states::exit_requested);
137 break;
138 } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
139
140 batch.batch.process_a_batch(batch.signalled_work_queue);
141 assert(batch.batch.batch_empty());
142 }
143 }
144 return thread_traits::api_params_type::no_kernel_thread;
145 }
146
147 template<class PTT> inline
148 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
149 steal(exit_requested_type &exit_r) noexcept(true)
150 : base_t(exit_r), work() {
151 }
152
153 template<class PTT> inline
154 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
155 steal(steal &&s) noexcept(true)
156 : base_t(s.exit_requested_), work(std::move(s.work)) {
157 }
158
159 template<class PTT> inline
160 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
161 ~steal() noexcept(false) {
162 this->delete_thread();
163 }
164
165 template<class PTT> inline bool
166 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
167 push_front(typename signalled_work_queue_type::value_type &&wk) {
168 work.batch.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
169 return true;
170 }
171
172 template<class PTT> inline bool
173 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
174 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thread) {
175
176 }
177
178 template<class PTT> inline
179 typename steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::thread_traits::api_params_type::states
180 steal<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT, pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::
181 process() noexcept(false) {
182 using cfg_type=typename container_type::container_type::value_type::value_type::cfg_type;
183
184 for (;;) {
185
186 while (work.batch.empty()) {
187 const typename exit_requested_type::lock_result_type lkd=this->exit_requested_.try_lock();
188 assert(lkd.second==lock_traits::atom_set);
189 if (lkd.first==exit_requested_type::states::exit_requested) {
190
191 this->exit_requested_.set(exit_requested_type::states::exit_requested);
192 return thread_traits::api_params_type::no_kernel_thread;
193 }
194 }
195 typename container_type::container_type::value_type current_work(work.batch.pop_front_1_nochk_nosig());
196 ppd::private_::eval_shared_deleter_t<typename container_type::container_type::value_type> wk(current_work);
197 wk.process_the_work(std::bind(&statistics_type::processed_vertical_work, &work.statistics_), cfg_type::vertical_edge_annotation);
198 }
199 return thread_traits::api_params_type::no_kernel_thread;
200 }
201
202 template<class PTT> inline
203 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
204 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
205 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
206 }
207
208 template<class PTT> inline
209 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
210 ~slave() noexcept(true) {
211 this->delete_thread();
212 }
213
214 template<class PTT> inline
215 typename slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
216 slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::
217 process() noexcept(false) {
218 class set_work_complete {
219 public:
220 explicit set_work_complete(typename signalled_work_queue_type::value_type::atomic_ptr_t &wc) noexcept(true)
221 : work_ptr(wc) {
222 }
223 ~set_work_complete() noexcept(true) {
224 work_ptr->work_complete()->set();
225 }
226
227 private:
228 typename signalled_work_queue_type::value_type::atomic_ptr_t &work_ptr;
229 };
230
231 typename signalled_work_queue_type::value_type::atomic_ptr_t work_ptr(some_work.get());
232 if (dynamic_cast<typename signalled_work_queue_type::value_type::no_ref_counting *>(work_ptr.get())) {
233 some_work=typename signalled_work_queue_type::value_type();
234 }
235 if (work_ptr->result_traits()==generic_traits::return_data::joinable) {
236 const set_work_complete setter(work_ptr);
237 work_ptr->process_joinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
238 } else {
239 work_ptr->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
240 }
241 this->statistics_.processed_vertical_work();
242 return thread_traits::api_params_type::no_kernel_thread;
243 }
244
245 } } } } }