2 * Scilab ( http://www.scilab.org/ ) - This file is part of Scilab
3 * Copyright (C) 2010 - DIGITEO - Bernard HUGUENEY
5 * This file must be used under the terms of the CeCILL.
6 * This source file is licensed as described in the file COPYING, which
7 * you should have received as part of this distribution. The terms
8 * are also available at
9 * http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt
13 #ifndef PARALLEL_RUN_HXX
14 #define PARALLEL_RUN_HXX
27 #include "scilabmode.h"
33 # define MAP_ANONYMOUS MAP_ANON
37 #include <semaphore.h>
49 #include "forkWindows.h"
50 #include "mmapWindows.h"
51 #include "semWindows.h"
57 due to alignment issues, we have to use lhs differents shared memory buffers
58 we avoif busywaiting (bad for cpu time) or sleeping (bad for wallclock time) thanks to semaphores in shared memopry
65 * allocates shared memory for s elements of type T. (anonymous, not mapped to a file)
67 * @param T type to alloc
68 * @param s nb of elements to alloc mem, defaults to 1.
70 template<typename T> T* allocSharedMem(std::size_t s=1)
72 return static_cast<T*>(mmap(0, sizeof(T)*s, PROT_READ | PROT_WRITE,MAP_SHARED | MAP_ANONYMOUS, -1, 0));
74 void freeSharedMem(void* ptr, std::size_t s=1)
78 struct PortableSemaphore
80 explicit PortableSemaphore (unsigned int init):ptr(allocSharedMem<sem_t>())
82 sem_init(ptr,1, init);
95 munmap(ptr, sizeof(sem_t));
102 struct PortableSignalInhibiter
104 PortableSignalInhibiter()
107 struct sigaction reapchildren;
108 std::memset( &reapchildren, 0, sizeof reapchildren );
109 reapchildren.sa_flags = SA_NOCLDWAIT;
110 sigaction( SIGCHLD, &reapchildren, &backup_sigaction );
114 ~PortableSignalInhibiter()
117 sigaction(SIGCHLD, &backup_sigaction, 0 );/* restore of sigaction */
122 struct sigaction backup_sigaction;
127 * Handles scheduling. Could be done in parallel_wrapper, but it would a a very long applyWithProcesses() member function
128 * breaking it would involve adding many member variables in the wrapper, so I chose an utility class with friend access to a parallel_wrapper
131 * The trick is to exchange the res[] ptr with ptrs to some shared memory so that callF() from each process fills the same shared result buffer.
132 * When all is done, we copy the result to the original (not shared) result buffer. Apart from the result buffer, we also share a std::size_t *todo
133 * poiting to the next index to compute.
135 * We use two cross process synch semaphores :
136 * 1°) todo_protect to protect access to the *todo shared value
137 * 2°) out_of_work to count how many workers have finished their work. Master process waits until nb_process have done their work.
142 #define __HAVE_FORK__ 1
146 template< typename ParallelWrapper> struct scheduler
148 typedef std::pair<std::size_t, std::size_t> workshare_t;
151 * constructs the scheduler, allocating the ressources (shared memory) and semaphores
152 * @param wrapper the parallel wrapper launching the computations
153 * @param nb_proc number of processes to use (spawns nb_proc-1 processes)
154 * @param dyn if scheduling is dynamic or static
155 * @param chunk_s chunk size. Only useful for dynamic sheduling as static is always faster with max chunk size for processes.
156 * /!\ changes w.res[] to point to shared memory buffer
158 scheduler( ParallelWrapper& wrapper,std::size_t nb_proc, bool dyn, std::size_t chunk_s)
159 : w(wrapper), nb_process(nb_proc), dynamic(dyn), chunk_size(chunk_s), todo(0)
162 , backup_res(wrapper.lhs)
164 for(std::size_t i(0); i != w.lhs; ++i)
166 backup_res[i]= w.res[i];
167 const_cast<char**>(w.res)[i]= allocSharedMem<char>(w.n * w.res_size[i]);
169 todo= allocSharedMem<std::size_t>();
174 * performs concurrent calls from w. (with w.f()) and copy results to the original w.res[] locations
175 * but does not restore w.res[} (this is done in destructor.
178 PortableSignalInhibiter guard; /* disable waiting notification from child death to avoid zombies */
179 std::vector<workshare_t> init_ws(nb_process);
180 for (std::size_t i(0); i != nb_process; ++i) {/* we precompute shares so that we don't have to synch */
181 init_ws[i]= initialWork(i);
184 for (p= 1; p != nb_process; ++p) {
185 if (!fork()) { /* child process goes to work at once */
186 setScilabMode(SCILAB_NWNI);
188 }/* parent process continues to spawn children */
190 if (p == nb_process) {
194 for(workshare_t ws(init_ws[p]); ws.first != ws.second; ws= getWork()) {
195 for(std::size_t i(ws.first); i != ws.second; ++i) {
196 w.callF(i);/* callF() is performed on our shared mem as res[] */
202 exit(EXIT_SUCCESS); // does not call destructor which is good :)
204 for (std::size_t i(0); i != nb_process; ++i) {/* wait for all workers to finish */
207 for (std::size_t i(0); i != w.lhs; ++i) {/* copy results into the original res[] memory */
208 std::memcpy(backup_res[i], w.res[i], w.res_size[i]*w.n);
210 }/* guard destructor restores the signals */
211 /* destroy/ free semaphores/shared memory, and restores original w.res[] values. */
213 for(std::size_t j(0); j!= w.lhs; ++j) {
214 freeSharedMem(w.res[j], w.n * w.res_size[j]);
215 const_cast<char**>(w.res)[j]= backup_res[j];
217 freeSharedMem(todo, sizeof(std::size_t));
220 /* compute initial workshares. no need to synch because we did not fork() yet
221 * @param p process id from 0(parent) -> nb_process-1
223 workshare_t initialWork( std::size_t p) const {
224 std::size_t old_todo(*todo);
225 // std::cerr<<"*todo="<<*todo<<" dynamic="<<dynamic<<" nb_process="<<nb_process<<" p="<<p<<std::endl;
226 *todo= min(w.n, *todo + (dynamic ? chunk_size : (w.n-*todo)/(nb_process-p)));
227 // std::cerr<<"AFTER : *todo="<<*todo<<" dynamic="<<dynamic<<" nb_process="<<nb_process<<" p="<<p<<std::endl;
228 return workshare_t(old_todo, *todo);
230 /* computes next workshare with protected access to shared *todo value */
231 workshare_t getWork() {
236 res.second= min(w.n, *todo + chunk_size);// no need to handle static scheduling because the last initialWork then set todo=w.n
246 std::size_t nb_process;
248 std::size_t chunk_size;
250 PortableSemaphore out_of_work, todo_protect;
251 std::vector<char*> backup_res;
254 template< typename ParallelWrapper> struct scheduler
256 scheduler( ParallelWrapper& wrapper,std::size_t nb_proc, bool dyn, std::size_t chunk_s)
262 for(std::size_t i(0); i != w.n; ++i)
270 template<typename F, typename G>
271 struct parallel_wrapper {
272 parallel_wrapper(char const* const* const a, std::size_t const* a_s, std::size_t const* a_n, std::size_t const the_rhs, std::size_t nb_tasks, char * const* const r, std::size_t const* r_s, std::size_t const the_lhs, F the_f, G p, G e)
273 :args(a), args_size(a_s), args_nb(a_n), rhs(the_rhs), n(nb_tasks),res(r), res_size(r_s), lhs(the_lhs), f(the_f), prologue(p), epilogue(e)
277 /* we define a functor. Calling it lanches the parallel processing of args, either with threads of processes(default).
278 the nb of wokers (threads or processes) can also be specified (default is implementation defined usually nb of cores).
279 TODO : enable specification of near / far / all (must, must not, can share L2 cache), at least for threads.
280 so first arge might not stay boolean (we can add an overload)*/
281 F operator()( bool with_threads=false, std::size_t nb_workers=0, bool dynamic_scheduling=true, int chunk_size=1)
284 ? applyWithThreads(nb_workers, dynamic_scheduling, chunk_size)
285 : applyWithProcesses(nb_workers, dynamic_scheduling, chunk_size);
288 friend struct scheduler<parallel_wrapper>;
290 /* Launch concurrent calls to f using OpenMP threads. OpenMP semantics for the parameters.
291 * @param nb_threads number of threads to use
292 * @param dynamic_scheduling if scheduling is dynamic or static
293 * @param chunk_size chunk size.
295 F applyWithThreads(std::size_t nb_threads, bool dynamic_scheduling, int chunk_size)
298 nb_threads = min(nb_threads, n);
301 omp_set_num_threads(static_cast<int>(nb_threads));
303 if (dynamic_scheduling)
305 #pragma omp parallel for private(i) schedule(dynamic, chunk_size)
311 #pragma omp parallel for private(i) schedule(static, chunk_size)
321 /* Launch concurrent calls to f using fork()ed processes. See scheduler<> fr details
322 * @param nb_processes number of processes to use
323 * @param dynamic_scheduling if scheduling is dynamic or static
324 * @param chunk_size chunk size.
326 F applyWithProcesses(std::size_t nb_process, bool dynamic_scheduling, std::size_t chunk_size)
328 nb_process = min( (nb_process ? nb_process : omp_get_num_procs()), n);
329 scheduler<parallel_wrapper> s(*this, nb_process, dynamic_scheduling, chunk_size );
333 /* Perform i^th call to f, ajusting arguments and results ptrs from args[] and res[]
334 * @param i args and reults index.
336 void callF(std::size_t const i)
339 std::vector<char const *> local_args(rhs);
340 for (std::size_t j(0); j!= rhs; ++j)
342 local_args[j]= args[j]+ (i%(*(args_nb+j))) *(*(args_size+j));/*(i%(*(args_nb+j))) because all args are not required to have same nb of elts*/
344 std::vector<char *> local_res(lhs);
345 for (std::size_t j(0); j!= lhs; ++j)
347 local_res[j]= res[j]+ i*(*(res_size+j));// all res are required to have n elts
349 f(&local_args[0], &local_res[0]);
352 char const* const* const args; /* ptrs to the rhs args */
353 std::size_t const* args_size; /* sizeof each rhs */
354 std::size_t const* args_nb;/* nb of given values for each arg, in 1...n */
355 std::size_t const rhs; /* nb of rhs for f */
356 std::size_t n; /* nb of calls to perform */
357 char* const* const res; /* ptr to the lhs results */
358 std::size_t const* res_size; /* sizeof each lhs */
359 std::size_t const lhs;/* nb of lhs for f */
360 /* no res_nb because we must have room for n vaules for each result */
361 F f;/* function to call */
362 G prologue, epilogue; /* function to call when launching/ terminating a worker process */
369 * make a parallel wrapper, just calls the constructor : is used mainly not to have to type the complete templated name of the wrapper type.
370 * We can then directly call the parallel_wrapper operator() to lanch the parallel processing.
373 template<typename ArgsItIt, typename ArgsSizeIt, typename ArgsNbIt, typename Sz1, typename Sz2, typename ResItIt, typename ResSizeIt, typename Sz3, typename F, typename G> parallel_wrapper<F, G>
374 make_parallel_wrapper(ArgsItIt args, ArgsSizeIt args_size, ArgsNbIt args_nb, Sz1 rhs, Sz2 n, ResItIt res, ResSizeIt res_size, Sz3 lhs, F f, G prologue, G epilogue)
376 return parallel_wrapper<F, G>(args, args_size, args_nb, rhs, n, res, res_size, lhs, f, prologue, epilogue);