bdd3b5bcae4ac7bb408814a140876eb059815c94
[scilab.git] / scilab / modules / parallel / src / cpp / parallel_run.hxx
1 /*
2 * Scilab ( http://www.scilab.org/ ) - This file is part of Scilab
3 * Copyright (C) 2010 - DIGITEO - Bernard HUGUENEY
4 *
5 * This file must be used under the terms of the CeCILL.
6 * This source file is licensed as described in the file COPYING, which
7 * you should have received as part of this distribution.  The terms
8 * are also available at
9 * http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt
10 *
11 */
12
13 #ifndef PARALLEL_RUN_HXX
14 #define PARALLEL_RUN_HXX
15
16 #include <algorithm>
17 #include <vector>
18 #include <iterator>
19 #include <iostream>
20 #include <algorithm>
21 #include <cstdlib>
22 #include <cstring>
23 #include <omp.h>
24
25 extern "C"
26 {
27 #include "scilabmode.h"
28 }
29
30 #ifndef  _MSC_VER
31
32 #ifndef MAP_ANONYMOUS
33 # define MAP_ANONYMOUS MAP_ANON
34 #endif
35
36 #include <sys/mman.h>
37 #include <semaphore.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <algorithm>
41 #include <unistd.h>
42
43 using std::min;
44 using std::max;
45 #else
46 #include <windows.h>
47 extern "C"
48 {
49 #include "forkWindows.h"
50 #include "mmapWindows.h"
51 #include "semWindows.h"
52 };
53 #endif
54
55 /*
56 implementation notes:
57 due to alignment issues, we have to use lhs differents shared memory buffers
58 we avoif busywaiting (bad for cpu time) or sleeping (bad for wallclock time) thanks to semaphores in shared memopry
59 */
60
61
62 namespace
63 {
64     /*
65     * allocates shared memory for s elements of type T. (anonymous, not mapped to a file)
66     *
67     * @param  T type to alloc
68     * @param  s nb of elements to alloc mem, defaults to 1.
69     */
70     template<typename T> T* allocSharedMem(std::size_t s=1)
71     {
72         return static_cast<T*>(mmap(0, sizeof(T)*s, PROT_READ | PROT_WRITE,MAP_SHARED |  MAP_ANONYMOUS, -1, 0));
73     }
74     void freeSharedMem(void* ptr, std::size_t s=1)
75     {
76         munmap(ptr, s);
77     }
78     struct PortableSemaphore
79     {
80         explicit PortableSemaphore (unsigned int init):ptr(allocSharedMem<sem_t>())
81         {
82             sem_init(ptr,1, init);
83         }
84
85         void post()
86         {
87             sem_post(ptr);
88         }
89         void wait()
90         {
91             sem_wait(ptr);
92         }
93         ~PortableSemaphore()
94         {
95             munmap(ptr, sizeof(sem_t));
96         }
97
98         sem_t* ptr;
99     };
100
101
102     struct PortableSignalInhibiter
103     {
104         PortableSignalInhibiter()
105         {
106 #ifndef _MSC_VER
107             struct sigaction reapchildren;
108             std::memset( &reapchildren, 0, sizeof reapchildren );
109             reapchildren.sa_flags = SA_NOCLDWAIT;
110             sigaction( SIGCHLD, &reapchildren, &backup_sigaction );
111 #else
112 #endif
113         }
114         ~PortableSignalInhibiter()
115         {
116 #ifndef _MSC_VER
117             sigaction(SIGCHLD, &backup_sigaction, 0 );/* restore of sigaction */
118 #else
119 #endif
120         }
121 #ifndef _MSC_VER
122         struct sigaction backup_sigaction;
123 #else
124 #endif
125     };
126     /*
127     * Handles scheduling. Could be done in parallel_wrapper, but it would a a very long applyWithProcesses() member function
128     * breaking it would involve adding many member variables in the wrapper, so I chose an utility class with friend access to a parallel_wrapper
129     * taken by ref.
130     *
131     * The trick is to exchange the res[] ptr with ptrs to some shared memory so that callF() from each process fills the same shared result buffer.
132     * When all is done, we copy the result to the original (not shared) result buffer. Apart from the result buffer, we also share a std::size_t *todo
133     * poiting to the next index to compute.
134     *
135     * We use two cross process synch semaphores :
136     * 1°) todo_protect to protect access to the *todo shared value
137     * 2°) out_of_work to count how many workers have finished their work. Master process waits until nb_process have done their work.
138     *
139     */
140
141 #ifndef _MSC_VER
142  #define __HAVE_FORK__ 1
143 #endif
144
145 #ifdef __HAVE_FORK__
146     template< typename ParallelWrapper> struct scheduler
147     {
148         typedef std::pair<std::size_t, std::size_t> workshare_t;
149
150         /*
151         * constructs the scheduler, allocating the ressources (shared memory) and semaphores
152         * @param wrapper the parallel wrapper launching the computations
153         * @param nb_proc number of processes to use (spawns nb_proc-1 processes)
154         * @param dyn if scheduling is dynamic or static
155         * @param chunk_s chunk size. Only useful for dynamic sheduling as static is always faster with max chunk size for processes.
156         * /!\ changes w.res[] to point to shared memory buffer
157         */
158         scheduler( ParallelWrapper& wrapper,std::size_t nb_proc,  bool dyn, std::size_t chunk_s)
159             : w(wrapper), nb_process(nb_proc), dynamic(dyn), chunk_size(chunk_s), todo(0)
160             , out_of_work(0)
161             , todo_protect(1)
162             , backup_res(wrapper.lhs)
163         {
164             for(std::size_t i(0); i != w.lhs; ++i)
165             {
166                 backup_res[i]= w.res[i];
167                 const_cast<char**>(w.res)[i]=  allocSharedMem<char>(w.n * w.res_size[i]);
168             }
169             todo= allocSharedMem<std::size_t>();
170             *todo= 0;
171         }
172
173         /*
174         * performs concurrent calls from w. (with w.f()) and copy results to the original w.res[] locations
175         * but does not restore w.res[} (this is done in destructor.
176         */
177         void operator()() {
178             PortableSignalInhibiter guard; /* disable waiting notification from child death to avoid zombies */
179             std::vector<workshare_t> init_ws(nb_process);
180             for (std::size_t i(0); i != nb_process; ++i) {/* we precompute shares so that we don't have to synch */
181                 init_ws[i]= initialWork(i);
182             }
183             std::size_t p;
184             for (p= 1; p != nb_process; ++p) {
185                 if (!fork()) { /* child process goes to work at once */
186                     setScilabMode(SCILAB_NWNI);
187                     break;
188                 }/* parent process continues to spawn children */
189             }
190             if (p == nb_process) {
191                 p= 0;
192             }
193             w.prologue(p);
194             for(workshare_t ws(init_ws[p]); ws.first != ws.second; ws= getWork()) {
195                 for(std::size_t i(ws.first); i != ws.second; ++i) {
196                     w.callF(i);/* callF() is performed on our shared mem as res[] */
197                 }
198             }
199             out_of_work.post();
200             w.epilogue(p);
201             if (p) {
202                 exit(EXIT_SUCCESS); // does not call destructor which is good :)
203             }
204             for (std::size_t i(0); i != nb_process; ++i)  {/* wait for all workers to finish */
205                 out_of_work.wait();
206             }
207             for (std::size_t i(0); i != w.lhs; ++i) {/* copy results into the original res[] memory */
208                 std::memcpy(backup_res[i], w.res[i], w.res_size[i]*w.n);
209             }
210         }/* guard destructor restores the signals */
211         /* destroy/ free semaphores/shared memory, and restores original w.res[] values. */
212         ~scheduler() {
213             for(std::size_t j(0); j!= w.lhs; ++j) {
214                 freeSharedMem(w.res[j], w.n * w.res_size[j]);
215                 const_cast<char**>(w.res)[j]= backup_res[j];
216             }
217             freeSharedMem(todo, sizeof(std::size_t));
218         }
219     private:
220         /* compute initial workshares. no need to synch because we did not fork() yet
221         * @param p process id from 0(parent) -> nb_process-1
222         */
223         workshare_t initialWork( std::size_t p) const {
224             std::size_t old_todo(*todo);
225             //      std::cerr<<"*todo="<<*todo<<" dynamic="<<dynamic<<" nb_process="<<nb_process<<" p="<<p<<std::endl;
226             *todo= min(w.n, *todo + (dynamic ? chunk_size : (w.n-*todo)/(nb_process-p)));
227             //      std::cerr<<"AFTER : *todo="<<*todo<<" dynamic="<<dynamic<<" nb_process="<<nb_process<<" p="<<p<<std::endl;
228             return workshare_t(old_todo, *todo);
229         }
230         /* computes next workshare with protected access to shared *todo value  */
231         workshare_t getWork() {
232             workshare_t res;
233             todo_protect.wait();
234             res.first= *todo;
235             if (*todo != w.n) {
236                 res.second= min(w.n, *todo + chunk_size);// no need to handle static scheduling because the last initialWork then set todo=w.n
237                 *todo= res.second;
238             }
239             else {
240                 res.second= w.n;
241             }
242             todo_protect.post();
243             return res;
244         }
245         ParallelWrapper& w;
246         std::size_t nb_process;
247         bool dynamic;
248         std::size_t chunk_size;
249         std::size_t* todo;
250         PortableSemaphore out_of_work, todo_protect;
251         std::vector<char*> backup_res;
252     };
253 #else
254     template< typename ParallelWrapper> struct scheduler
255     {
256         scheduler( ParallelWrapper& wrapper,std::size_t nb_proc, bool dyn, std::size_t chunk_s)
257             : w(wrapper)
258         {
259         }
260         void operator()()
261         {
262             for(std::size_t i(0); i != w.n; ++i)
263             {
264                 w.callF(i);
265             }
266         }
267         ParallelWrapper& w;
268     };
269 #endif
270     template<typename F, typename G>
271     struct parallel_wrapper {
272         parallel_wrapper(char const* const* const a, std::size_t const* a_s, std::size_t const* a_n, std::size_t const the_rhs, std::size_t nb_tasks, char * const* const r, std::size_t const* r_s, std::size_t const the_lhs, F the_f, G p, G e)
273             :args(a), args_size(a_s), args_nb(a_n), rhs(the_rhs), n(nb_tasks),res(r), res_size(r_s), lhs(the_lhs), f(the_f), prologue(p), epilogue(e)
274         {
275         }
276
277         /* we define a functor. Calling it lanches the parallel processing of args, either with threads of processes(default).
278         the nb of wokers (threads or processes) can also be specified (default is implementation defined usually nb of cores).
279         TODO : enable specification of near / far / all (must, must not, can share L2 cache), at least for threads.
280         so first arge might not stay boolean (we can add an overload)*/
281         F operator()( bool with_threads=false, std::size_t nb_workers=0, bool dynamic_scheduling=true, int chunk_size=1)
282         {
283             return with_threads
284                 ? applyWithThreads(nb_workers, dynamic_scheduling, chunk_size)
285                 : applyWithProcesses(nb_workers, dynamic_scheduling, chunk_size);
286         }
287     private:
288         friend struct scheduler<parallel_wrapper>;
289
290         /* Launch concurrent calls to f using OpenMP threads. OpenMP semantics for the parameters.
291         * @param nb_threads number of threads to use
292         * @param dynamic_scheduling if scheduling is dynamic or static
293         * @param chunk_size chunk size.
294         */
295         F applyWithThreads(std::size_t nb_threads, bool dynamic_scheduling, int chunk_size)
296         {
297             signed int i;
298             nb_threads = min(nb_threads, n);
299             if (nb_threads)
300             {
301                 omp_set_num_threads(static_cast<int>(nb_threads));
302             }
303             if (dynamic_scheduling)
304             {
305 #pragma omp parallel for private(i) schedule(dynamic, chunk_size)
306                 for(i=0; (size_t)i < n; ++i)
307                 {
308                     callF(i);
309                 }
310             } else {
311 #pragma omp parallel for private(i)  schedule(static, chunk_size)
312                 for(i=0; (size_t)i < n; ++i)
313                 {
314                     callF(i);
315                 }
316
317             }
318             return f;
319         }
320
321         /* Launch concurrent calls to f using fork()ed processes. See scheduler<> fr details
322         * @param nb_processes number of processes to use
323         * @param dynamic_scheduling if scheduling is dynamic or static
324         * @param chunk_size chunk size.
325         */
326         F applyWithProcesses(std::size_t nb_process, bool dynamic_scheduling, std::size_t chunk_size)
327         {
328             nb_process = min( (nb_process ? nb_process : omp_get_num_procs()), n);
329             scheduler<parallel_wrapper> s(*this, nb_process, dynamic_scheduling, chunk_size );
330             s();
331             return f;
332         }
333         /* Perform i^th call to f, ajusting arguments and results ptrs from args[] and res[]
334         * @param i args and reults index.
335         */
336         void callF(std::size_t const i)
337         {
338
339             std::vector<char const *> local_args(rhs);
340             for (std::size_t j(0); j!= rhs; ++j)
341             {
342                 local_args[j]= args[j]+ (i%(*(args_nb+j))) *(*(args_size+j));/*(i%(*(args_nb+j))) because all args are not required to have same nb of elts*/
343             }
344             std::vector<char *> local_res(lhs);
345             for (std::size_t j(0); j!= lhs; ++j)
346             {
347                 local_res[j]= res[j]+ i*(*(res_size+j));// all res are required to have n elts
348             }
349             f(&local_args[0], &local_res[0]);
350
351         }
352         char const* const* const args; /* ptrs to the rhs args */
353         std::size_t const* args_size; /* sizeof each rhs */
354         std::size_t const* args_nb;/* nb of given values for each arg, in 1...n */
355         std::size_t const rhs;  /* nb of rhs for f */
356         std::size_t n; /* nb of calls to perform */
357         char* const* const res; /* ptr to the lhs results */
358         std::size_t const* res_size; /* sizeof each lhs */
359         std::size_t const lhs;/* nb of lhs for f */
360         /* no res_nb because we must have room for n vaules for each result */
361         F f;/* function to call */
362         G prologue, epilogue; /* function to call when launching/ terminating a worker process */
363     };
364
365 }
366
367
368 /*
369 * make a parallel wrapper, just calls the constructor : is used mainly not to have to type the complete templated name of the wrapper type.
370 * We can then directly call the parallel_wrapper operator() to lanch the parallel processing.
371 * @param
372 */
373 template<typename ArgsItIt, typename ArgsSizeIt, typename ArgsNbIt, typename Sz1, typename Sz2, typename ResItIt, typename ResSizeIt, typename Sz3, typename F, typename G>   parallel_wrapper<F, G>
374 make_parallel_wrapper(ArgsItIt args, ArgsSizeIt args_size, ArgsNbIt args_nb, Sz1 rhs, Sz2 n, ResItIt res, ResSizeIt res_size, Sz3 lhs, F f, G prologue, G epilogue)
375 {
376     return parallel_wrapper<F, G>(args, args_size, args_nb, rhs, n, res, res_size, lhs, f, prologue, epilogue);
377 }
378 #endif