This source file includes following definitions.
- process_a_batch_item
- add_work_to_batch
- pool_empty
- queue_empty
- pool
- add_work_to_batch
- pool_empty
- pool
- add_work_to_batch
- queue_empty
1 #ifndef libjmmcg_core_thread_pool_queue_model_hpp
2 #define libjmmcg_core_thread_pool_queue_model_hpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "thread_pool_base.hpp"
25
26 namespace jmmcg { namespace ppd { namespace private_ {
27
28
29 template<
30 unsigned long GSSkSz,
31 class PTT,
32 class Pt,
33 class QM
34 >
35 class GSSk_batching {
36 public:
37 typedef PTT pool_traits_type;
38 typedef typename pool_traits_type::os_traits os_traits;
39 typedef Pt pool_type;
40 typedef typename os_traits::thread_traits thread_traits;
41 using signalled_work_queue_type=typename pool_traits_type::template signalled_work_queue_type<QM>;
42
43 using batch_details_type=batch_details<pool_traits_type::GSSk, signalled_work_queue_type, typename remove_shared_ptr<typename pool_type::value_type, api_lock_traits<platform_api, sequential_mode>>::value_type::statistics_type>;
44 typedef typename batch_details_type::statistics_type statistics_type;
45 static constexpr unsigned long GSSk=batch_details_type::GSSk;
46
47 explicit GSSk_batching(const typename thread_traits::api_params_type::tid_type mtid) noexcept(true) FORCE_INLINE
48 : main_tid(mtid) {
49 }
50
51
52
53
54
55
56
57
58
59 bool __fastcall process_a_batch_item(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename os_traits::thread_exception const &exception_thrown_in_thread) noexcept(false) FORCE_INLINE {
60 if (tid==main_tid) {
61 return batch.process_a_batch_item();
62 } else {
63 const typename pool_type::container_type::iterator thread=pool.find(tid);
64 if (thread!=pool.end()) {
65 assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
66 return thread->second->process_a_batch_item(exception_thrown_in_thread);
67 } else {
68
69 return false;
70 }
71 }
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85 bool __fastcall add_work_to_batch(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
86 if (tid==main_tid) {
87 return batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
88 } else {
89 const auto thread=pool.find(tid);
90 if (thread!=pool.end()) {
91 assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
92 return thread->second->add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
93 }
94 }
95
96 return false;
97 }
98
99 statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
100 return batch.statistics();
101 }
102 statistics_type &__fastcall statistics() noexcept(true) FORCE_INLINE {
103 return batch.statistics();
104 }
105
106 private:
107 batch_details_type batch;
108 const typename thread_traits::api_params_type::tid_type main_tid;
109 };
110
111
112
113
114
115 template<
116 class DM,
117 pool_traits::size_mode_t Ps,
118 typename PTT,
119 class Pt
120 >
121 class thread_pool_queue_model;
122
123
124
125
126
127
128
129 template<
130 template<class> class QM,
131 pool_traits::size_mode_t Ps,
132 typename PTT,
133 class Pt
134 >
135 class thread_pool_queue_model<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt> : public thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue> {
136 public:
137 using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>;
138 using base1_t=thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>;
139 using pool_traits_type=typename base1_t::pool_traits_type;
140 using os_traits=typename base1_t::os_traits;
141 using pool_type=typename base1_t::pool_type;
142 using queue_size_type=typename base1_t::queue_size_type;
143 using pool_size_type=typename base1_t::pool_size_type;
144 using pool_thread_type=typename base1_t::pool_thread_type;
145 using exception_type=typename base1_t::exception_type;
146 using thread_traits=typename base1_t::thread_traits;
147 using api_params_type=typename base1_t::api_params_type;
148 using priority_type=typename base1_t::priority_type;
149 using work_distribution_mode=typename base1_t::work_distribution_mode;
150 using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
151 using queue_model=typename base_t::queue_model;
152
153
154
155
156 static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
157
158 using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
159
160
161
162
163
164
165
166 using statistics_type=typename base1_t::statistics_type;
167 using cfg_type=typename base1_t::cfg_type;
168
169
170
171
172
173 bool __fastcall pool_empty() const noexcept(true) FORCE_INLINE {
174 return pool.empty();
175 }
176
177
178
179
180 const pool_size_type __fastcall pool_size() const noexcept(true) FORCE_INLINE final {
181 return pool.size();
182 }
183
184
185
186 bool __fastcall queue_empty() const noexcept(true) FORCE_INLINE {
187 return this->signalled_work_queue.empty();
188 }
189
190
191
192 const queue_size_type __fastcall queue_size() const noexcept(true) FORCE_INLINE {
193 return this->signalled_work_queue.size();
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 unsigned long __fastcall
210 min_time(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
211 template<class T>
212 unsigned long __fastcall FORCE_INLINE
213 min_time(T const &) const noexcept(true);
214
215
216
217
218
219
220
221
222
223
224
225
226
227 unsigned long __fastcall
228 min_processors(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
229 template<class T>
230 unsigned long __fastcall FORCE_INLINE
231 min_processors(T const &) const noexcept(true);
232
233 protected:
234 pool_type pool;
235
236
237
238 __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
239 : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_, this->signalled_work_queue) {
240 }
241
242 queue_size_type __fastcall
243 batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
244
245 signalled_work_queue_type & __fastcall queue() noexcept(true) FORCE_INLINE {
246 return this->signalled_work_queue;
247 }
248 signalled_work_queue_type const & __fastcall queue() const noexcept(true) FORCE_INLINE final {
249 return this->signalled_work_queue;
250 }
251
252
253
254
255 virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
256 return false;
257 }
258
259 typename base_t::exit_requested_type &exit_requested() noexcept(true) FORCE_INLINE {
260 return this->exit_requested_;
261 }
262
263 private:
264 template<class TPB> friend class joinable_t;
265 template<class TPB> friend class nonjoinable_t;
266 template<class TPB> friend class nonjoinable_buff_t;
267 template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
268 template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
269 template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
270 template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
271 };
272
273
274
275
276
277
278
279 template<
280 pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t SM,
281 pool_traits::size_mode_t Ps,
282 typename PTT,
283 class Pt
284 >
285 class thread_pool_queue_model<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<SM>>, Ps, PTT, Pt> : public thread_pool_base<pool_traits::work_distribution_mode_t::template worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>> {
286 public:
287 using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>;
288 using base1_t=thread_pool_base<pool_traits::work_distribution_mode_t::template worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>, Ps, PTT, Pt>;
289 using pool_traits_type=typename base1_t::pool_traits_type;
290 using os_traits=typename base1_t::os_traits;
291 using pool_type=typename base1_t::pool_type;
292 using queue_size_type=typename base1_t::queue_size_type;
293 using pool_size_type=typename base1_t::pool_size_type;
294 using pool_thread_type=typename base1_t::pool_thread_type;
295 using exception_type=typename base1_t::exception_type;
296 using thread_traits=typename base1_t::thread_traits;
297 using api_params_type=typename base1_t::api_params_type;
298 using priority_type=typename base1_t::priority_type;
299 using work_distribution_mode=typename base1_t::work_distribution_mode;
300 using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
301 using queue_model=typename base_t::queue_model;
302
303
304
305
306 static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
307
308
309 BOOST_MPL_ASSERT((std::is_same<std::integral_constant<unsigned long, pool_traits_type::GSSk>, std::integral_constant<unsigned long, 1UL>>));
310
311 using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
312
313
314
315
316
317
318
319 using statistics_type=typename base1_t::statistics_type;
320 using cfg_type=typename base1_t::cfg_type;
321
322
323
324
325
326 bool __fastcall pool_empty() const noexcept(true) FORCE_INLINE {
327 return pool.empty();
328 }
329
330
331
332
333 const pool_size_type __fastcall pool_size() const noexcept(true) FORCE_INLINE {
334 return pool.size();
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350 unsigned long __fastcall
351 min_time(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
352 template<class T>
353 unsigned long __fastcall FORCE_INLINE
354 min_time(T const &) const noexcept(true);
355
356
357
358
359
360
361
362
363
364
365
366
367
368 unsigned long __fastcall
369 min_processors(generic_traits::memory_access_modes mode) const noexcept(true) FORCE_INLINE;
370 template<class T>
371 unsigned long __fastcall FORCE_INLINE
372 min_processors(T const &) const noexcept(true);
373
374 protected:
375 pool_type pool;
376
377 BOOST_MPL_ASSERT((std::is_same<typename base_t::exit_requested_type, typename pool_type::exit_requested_type>));
378
379 __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
380 : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_) {
381 }
382
383 queue_size_type __fastcall
384 batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
385
386
387
388
389
390 virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
391 return false;
392 }
393
394 typename base_t::exit_requested_type &exit_requested() noexcept(true) FORCE_INLINE {
395 return this->exit_requested_;
396 }
397
398 private:
399 template<class TPB> friend class joinable_t;
400 template<class TPB> friend class nonjoinable_t;
401 template<class TPB> friend class nonjoinable_buff_t;
402 template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
403 template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
404 template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
405 template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
406
407 signalled_work_queue_type & __fastcall queue() noexcept(true) FORCE_INLINE {}
408 signalled_work_queue_type const & __fastcall queue() const noexcept(true) FORCE_INLINE {}
409
410
411
412 bool __fastcall queue_empty() const noexcept(true) FORCE_INLINE {}
413
414
415
416 const queue_size_type __fastcall queue_size() const noexcept(true) FORCE_INLINE {}
417 };
418
419 } } }
420
421 #include "thread_pool_queue_model_impl.hpp"
422
423 #endif