StarPU Internal Handbook
workers.h
1 /* StarPU --- Runtime system for heterogeneous multicore architectures.
2  *
3  * Copyright (C) 2011-2017 Inria
4  * Copyright (C) 2008-2019 Université de Bordeaux
5  * Copyright (C) 2010-2019 CNRS
6  * Copyright (C) 2013 Thibaut Lambert
7  * Copyright (C) 2016 Uppsala University
8  *
9  * StarPU is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU Lesser General Public License as published by
11  * the Free Software Foundation; either version 2.1 of the License, or (at
12  * your option) any later version.
13  *
14  * StarPU is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
17  *
18  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
19  */
20 
21 #ifndef __WORKERS_H__
22 #define __WORKERS_H__
23 
25 /* @{ */
26 
27 #include <limits.h>
28 
29 #include <starpu.h>
30 #include <common/config.h>
31 #include <common/timing.h>
32 #include <common/fxt.h>
33 #include <common/thread.h>
34 #include <core/jobs.h>
35 #include <core/perfmodel/perfmodel.h>
36 #include <core/sched_policy.h>
37 #include <core/topology.h>
38 #include <core/errorcheck.h>
39 #include <core/sched_ctx.h>
40 #include <core/sched_ctx_list.h>
41 #include <core/simgrid.h>
42 #ifdef STARPU_HAVE_HWLOC
43 #include <hwloc.h>
44 #endif
45 
46 #include <core/drivers.h>
47 #include <drivers/cuda/driver_cuda.h>
48 #include <drivers/opencl/driver_opencl.h>
49 
50 #ifdef STARPU_USE_MIC
51 #include <drivers/mic/driver_mic_source.h>
52 #endif /* STARPU_USE_MIC */
53 
54 #ifdef STARPU_USE_MPI_MASTER_SLAVE
55 #include <drivers/mpi/driver_mpi_source.h>
56 #endif
57 
58 #include <drivers/cpu/driver_cpu.h>
59 
60 #include <datawizard/datawizard.h>
61 
62 #include <starpu_parameters.h>
63 
64 #define STARPU_MAX_PIPELINE 4
65 
66 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
67 
68 struct _starpu_ctx_change_list;
69 
71 LIST_TYPE(_starpu_worker,
72  struct _starpu_machine_config *config;
73  starpu_pthread_mutex_t mutex;
74  enum starpu_worker_archtype arch;
75  uint32_t worker_mask;
76  struct starpu_perfmodel_arch perf_arch;
77  starpu_pthread_t worker_thread;
78  unsigned devid;
79  unsigned subworkerid;
80  int bindid;
81  int workerid;
85  starpu_pthread_cond_t started_cond;
86  starpu_pthread_cond_t ready_cond;
87  unsigned memory_node;
88  unsigned numa_memory_node;
93  starpu_pthread_cond_t sched_cond;
94  starpu_pthread_mutex_t sched_mutex;
95  unsigned state_relax_refcnt;
96 #ifdef STARPU_SPINLOCK_CHECK
97  const char *relax_on_file;
98  int relax_on_line;
99  const char *relax_on_func;
100  const char *relax_off_file;
101  int relax_off_line;
102  const char *relax_off_func;
103 #endif
121  starpu_pthread_t thread_changing_ctx;
129  struct _starpu_ctx_change_list ctx_change_list;
130  struct starpu_task_list local_tasks;
131  struct starpu_task **local_ordered_tasks;
135  struct starpu_task *current_task;
136  struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
137 #ifdef STARPU_SIMGRID
138  starpu_pthread_wait_t wait;
139 #endif
140 
141  struct timespec cl_start;
142  struct timespec cl_end;
143  unsigned char first_task;
144  unsigned char ntasks;
145  unsigned char pipeline_length;
146  unsigned char pipeline_stuck;
147  struct _starpu_worker_set *set;
148  unsigned worker_is_running;
149  unsigned worker_is_initialized;
150  enum _starpu_worker_status status;
151  unsigned state_keep_awake;
152  char name[128];
153  char short_name[32];
154  unsigned run_by_starpu;
155  struct _starpu_driver_ops *driver_ops;
156 
157  struct _starpu_sched_ctx_list *sched_ctx_list;
158  int tmp_sched_ctx;
159  unsigned nsched_ctxs;
160  struct _starpu_barrier_counter tasks_barrier;
162  unsigned has_prev_init;
164  unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
165 
166  unsigned spinning_backoff ;
170  struct starpu_task *task_transferring;
176  unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
177 
178  unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
184  unsigned reverse_phase[2];
185 
186  unsigned pop_ctx_priority;
189  struct _starpu_sched_ctx *stream_ctx;
190 
191 #ifdef __GLIBC__
192  cpu_set_t cpu_set;
193 #endif /* __GLIBC__ */
194 #ifdef STARPU_HAVE_HWLOC
195  hwloc_bitmap_t hwloc_cpu_set;
196  hwloc_obj_t hwloc_obj;
197 #endif
198 );
199 
201 {
202  struct starpu_perfmodel_arch perf_arch;
203  uint32_t worker_mask;
204  int worker_size;
205  unsigned memory_node;
206  int combined_workerid[STARPU_NMAXWORKERS];
207 #ifdef STARPU_USE_MP
208  int count;
209  starpu_pthread_mutex_t count_mutex;
210 #endif
211 
212 #ifdef __GLIBC__
213  cpu_set_t cpu_set;
214 #endif /* __GLIBC__ */
215 #ifdef STARPU_HAVE_HWLOC
216  hwloc_bitmap_t hwloc_cpu_set;
217 #endif
218 };
219 
225 {
226  starpu_pthread_mutex_t mutex;
227  starpu_pthread_t worker_thread;
228  unsigned nworkers;
229  unsigned started;
230  void *retval;
231  struct _starpu_worker *workers;
232  starpu_pthread_cond_t ready_cond;
233  unsigned set_is_initialized;
234 };
235 
236 #ifdef STARPU_USE_MPI_MASTER_SLAVE
237 extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
238 #endif
239 
241 {
243  unsigned nworkers;
244 
247 
248  unsigned nsched_ctxs;
249 
250 #ifdef STARPU_HAVE_HWLOC
251 
252  hwloc_topology_t hwtopology;
253 #endif
254 
255  struct starpu_tree *tree;
256 
260  unsigned nhwcpus;
261 
265  unsigned nhwpus;
266 
270  unsigned nhwcudagpus;
271 
275  unsigned nhwopenclgpus;
276 
280  unsigned nhwmpi;
281 
283  unsigned ncpus;
284 
286  unsigned ncudagpus;
287  unsigned nworkerpercuda;
288  int cuda_th_per_stream;
289  int cuda_th_per_dev;
290 
292  unsigned nopenclgpus;
293 
295  unsigned nmpidevices;
296  unsigned nhwmpidevices;
297 
298  unsigned nhwmpicores[STARPU_MAXMPIDEVS];
299  unsigned nmpicores[STARPU_MAXMPIDEVS];
300 
303  unsigned nhwmicdevices;
304  unsigned nmicdevices;
305 
306  unsigned nhwmiccores[STARPU_MAXMICDEVS];
307  unsigned nmiccores[STARPU_MAXMICDEVS];
308 
316  unsigned workers_bindid[STARPU_NMAXWORKERS];
317 
324  unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS];
325 
332  unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS];
333 
334  /*** Indicates the successive MIC devices that should be used
335  * by the MIC driver. It is either filled according to the
336  * user's explicit parameters (from starpu_conf) or according
337  * to the STARPU_WORKERS_MICID env. variable. Otherwise, they
338  * are taken in ID order. */
342  unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS];
343 };
344 
346 {
347  struct _starpu_machine_topology topology;
348 
349 #ifdef STARPU_HAVE_HWLOC
350  int cpu_depth;
351  int pu_depth;
352 #endif
353 
356  char currently_bound[STARPU_NMAXWORKERS];
357  char currently_shared[STARPU_NMAXWORKERS];
358 
361 
364 
367 
370 
381 
384  struct _starpu_worker workers[STARPU_NMAXWORKERS];
385 
388  struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS];
389 
391  struct
392  {
393  int *workerids;
394  unsigned nworkers;
395  } *bindid_workers;
396  unsigned nbindid;
401  uint32_t worker_mask;
402 
404  struct starpu_conf conf;
405 
407  unsigned running;
408 
409  int disable_kernels;
410 
414 
416  struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
417 
419  unsigned submitting;
420 
421  int watchdog_ok;
422 
423  starpu_pthread_mutex_t submitted_mutex;
424 };
425 
426 extern int _starpu_worker_parallel_blocks;
427 
428 extern struct _starpu_machine_config _starpu_config STARPU_ATTRIBUTE_INTERNAL;
429 extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
430 extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
431 extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
432 
434 void _starpu_set_argc_argv(int *argc, char ***argv);
435 int *_starpu_get_argc();
436 char ***_starpu_get_argv();
437 
439 void _starpu_conf_check_environment(struct starpu_conf *conf);
440 
442 void _starpu_may_pause(void);
443 
445 static inline unsigned _starpu_machine_is_running(void)
446 {
447  unsigned ret;
448  /* running is just protected by a memory barrier */
449  STARPU_RMB();
450 
451  ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
452  ret = _starpu_config.running;
453  ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
454  return ret;
455 }
456 
457 
459 void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig);
460 
462 uint32_t _starpu_worker_exists(struct starpu_task *);
463 
465 uint32_t _starpu_can_submit_cuda_task(void);
466 
468 uint32_t _starpu_can_submit_cpu_task(void);
469 
471 uint32_t _starpu_can_submit_opencl_task(void);
472 
475 unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker);
476 
480 void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
481 
483 void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
485 void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
486 
487 static inline unsigned _starpu_worker_get_count(void)
488 {
489  return _starpu_config.topology.nworkers;
490 }
491 #define starpu_worker_get_count _starpu_worker_get_count
492 
496 static inline void _starpu_set_local_worker_key(struct _starpu_worker *worker)
497 {
498  STARPU_ASSERT(_starpu_keys_initialized);
499  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
500 }
501 
504 static inline struct _starpu_worker *_starpu_get_local_worker_key(void)
505 {
506  if (!_starpu_keys_initialized)
507  return NULL;
508  return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
509 }
510 
514 static inline void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
515 {
516  STARPU_ASSERT(_starpu_keys_initialized);
517  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
518 }
519 
523 {
524  if (!_starpu_keys_initialized)
525  return NULL;
526  return (struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
527 }
528 
531 static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
532 {
533  STARPU_ASSERT(id < starpu_worker_get_count());
534  return &_starpu_config.workers[id];
535 }
536 
539 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
540 {
541  return (id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
542 }
543 
544 struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id);
545 
549 {
550  return &_starpu_config;
551 }
552 
554 static inline int _starpu_get_disable_kernels(void)
555 {
556  return _starpu_config.disable_kernels;
557 }
558 
560 static inline enum _starpu_worker_status _starpu_worker_get_status(int workerid)
561 {
562  return _starpu_config.workers[workerid].status;
563 }
564 
567 static inline void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
568 {
569  _starpu_config.workers[workerid].status = status;
570 }
571 
573 static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
574 {
575  return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
576 }
577 
578 int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
579 
582 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
583 
584 static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
585 {
586  struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
587  return &w->sched_mutex == mutex;
588 }
589 
590 static inline int _starpu_worker_get_nsched_ctxs(int workerid)
591 {
592  return _starpu_config.workers[workerid].nsched_ctxs;
593 }
594 
596 static inline unsigned _starpu_get_nsched_ctxs(void)
597 {
598  /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
599  STARPU_RMB();
600  return _starpu_config.topology.nsched_ctxs;
601 }
602 
604 static inline int _starpu_worker_get_id(void)
605 {
606  struct _starpu_worker * worker;
607 
608  worker = _starpu_get_local_worker_key();
609  if (worker)
610  {
611  return worker->workerid;
612  }
613  else
614  {
615  /* there is no worker associated to that thread, perhaps it is
616  * a thread from the application or this is some SPU worker */
617  return -1;
618  }
619 }
620 #define starpu_worker_get_id _starpu_worker_get_id
621 
624 static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
625 {
626  (void) l;
627  (void) f;
628  int id = starpu_worker_get_id();
629  STARPU_ASSERT_MSG(id>=0, "%s:%d Cannot be called from outside a worker\n", f, l);
630  return id;
631 }
632 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
633 
634 enum starpu_node_kind _starpu_worker_get_node_kind(enum starpu_worker_archtype type);
635 
636 void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *sched_ctx);
637 
638 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
639 
645 static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
646 {
647  _starpu_worker_parallel_blocks = 1;
648  /* flush pending requests to start on a fresh transaction epoch */
649  while (worker->state_unblock_in_parallel_req)
650  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
651 
652  /* announce blocking intent */
653  STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
654  worker->block_in_parallel_ref_count++;
655 
656  if (worker->block_in_parallel_ref_count == 1)
657  {
658  /* only the transition from 0 to 1 triggers the block_in_parallel_req */
659 
660  STARPU_ASSERT(!worker->state_blocked_in_parallel);
661  STARPU_ASSERT(!worker->state_block_in_parallel_req);
662  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
663  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
664  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
665 
666  /* trigger the block_in_parallel_req */
667  worker->state_block_in_parallel_req = 1;
668  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
669 #ifdef STARPU_SIMGRID
670  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
671 #endif
672 
673  /* wait for block_in_parallel_req to be processed */
674  while (!worker->state_block_in_parallel_ack)
675  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
676 
677  STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
678  STARPU_ASSERT(worker->state_block_in_parallel_req);
679  STARPU_ASSERT(worker->state_blocked_in_parallel);
680 
681  /* reset block_in_parallel_req state flags */
682  worker->state_block_in_parallel_req = 0;
683  worker->state_block_in_parallel_ack = 0;
684 
685  /* broadcast block_in_parallel_req state flags reset */
686  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
687  }
688 }
689 
694 static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
695 {
696  /* flush pending requests to start on a fresh transaction epoch */
697  while (worker->state_block_in_parallel_req)
698  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
699 
700  /* unblocking may be requested unconditionnally
701  * thus, check is unblocking is really needed */
702  if (worker->state_blocked_in_parallel)
703  {
704  if (worker->block_in_parallel_ref_count == 1)
705  {
706  /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
707 
708  STARPU_ASSERT(!worker->state_block_in_parallel_req);
709  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
710  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
711  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
712 
713  /* trigger the unblock_in_parallel_req */
714  worker->state_unblock_in_parallel_req = 1;
715  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
716 
717  /* wait for the unblock_in_parallel_req to be processed */
718  while (!worker->state_unblock_in_parallel_ack)
719  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
720 
721  STARPU_ASSERT(worker->state_unblock_in_parallel_req);
722  STARPU_ASSERT(!worker->state_blocked_in_parallel);
723 
724  /* reset unblock_in_parallel_req state flags */
725  worker->state_unblock_in_parallel_req = 0;
726  worker->state_unblock_in_parallel_ack = 0;
727 
728  /* broadcast unblock_in_parallel_req state flags reset */
729  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
730  }
731 
732  /* announce unblocking complete */
733  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
734  worker->block_in_parallel_ref_count--;
735  }
736 }
737 
743 static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
744 {
745  while (worker->state_block_in_parallel_req)
746  {
747  STARPU_ASSERT(!worker->state_blocked_in_parallel);
748  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
749  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
750  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
751  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
752 
753  /* enter effective blocked state */
754  worker->state_blocked_in_parallel = 1;
755 
756  /* notify block_in_parallel_req processing */
757  worker->state_block_in_parallel_ack = 1;
758  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
759 
760  /* block */
761  while (!worker->state_unblock_in_parallel_req)
762  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
763 
764  STARPU_ASSERT(worker->state_blocked_in_parallel);
765  STARPU_ASSERT(!worker->state_block_in_parallel_req);
766  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
767  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
768  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
769 
770  /* leave effective blocked state */
771  worker->state_blocked_in_parallel = 0;
772 
773  /* notify unblock_in_parallel_req processing */
774  worker->state_unblock_in_parallel_ack = 1;
775  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
776  }
777 }
778 
795 #ifdef STARPU_SPINLOCK_CHECK
796 static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
797 #else
798 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
799 #endif
800 {
801  STARPU_ASSERT(!worker->state_sched_op_pending);
803  {
804  /* process pending block requests before entering a sched_op region */
806  while (worker->state_changing_ctx_notice)
807  {
808  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
809 
810  /* new block requests may have been triggered during the wait,
811  * need to check again */
813  }
814  }
815  else
816  {
817  /* if someone observed the worker state since the last call, postpone block request
818  * processing for one sched_op turn more, because the observer will not have seen
819  * new block requests between its observation and now.
820  *
821  * however, the worker still has to wait for context change operations to complete
822  * before entering sched_op again*/
823  while (worker->state_changing_ctx_notice)
824  {
825  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
826  }
827  }
828 
829  /* no block request and no ctx change ahead,
830  * enter sched_op */
831  worker->state_sched_op_pending = 1;
833  worker->state_relax_refcnt = 0;
834 #ifdef STARPU_SPINLOCK_CHECK
835  worker->relax_on_file = file;
836  worker->relax_on_line = line;
837  worker->relax_on_func = func;
838 #endif
839 }
840 #ifdef STARPU_SPINLOCK_CHECK
841 #define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
842 #endif
843 
849 #ifdef STARPU_SPINLOCK_CHECK
850 static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
851 #else
852 static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
853 #endif
854 {
855  STARPU_ASSERT(worker->state_sched_op_pending);
856  worker->state_relax_refcnt = 1;
857 #ifdef STARPU_SPINLOCK_CHECK
858  worker->relax_off_file = file;
859  worker->relax_off_line = line;
860  worker->relax_off_func = func;
861 #endif
862  worker->state_sched_op_pending = 0;
863  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
865 }
866 #ifdef STARPU_SPINLOCK_CHECK
867 #define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
868 #endif
869 
870 static inline int _starpu_worker_sched_op_pending(void)
871 {
872  int workerid = starpu_worker_get_id();
873  if (workerid == -1)
874  return 0;
875  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
876  STARPU_ASSERT(worker != NULL);
877  return worker->state_sched_op_pending;
878 }
879 
889 static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
890 {
891  STARPU_ASSERT(!starpu_pthread_equal(worker->thread_changing_ctx, starpu_pthread_self()));
892  /* flush pending requests to start on a fresh transaction epoch */
893  while (worker->state_changing_ctx_notice)
894  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
895 
896  /* announce changing_ctx intent
897  *
898  * - an already started sched_op is allowed to complete
899  * - no new sched_op may be started
900  */
901  worker->state_changing_ctx_notice = 1;
902 
903  worker->thread_changing_ctx = starpu_pthread_self();
904 
905  /* allow for an already started sched_op to complete */
906  if (worker->state_sched_op_pending)
907  {
908  /* request sched_op to broadcast when way is cleared */
909  worker->state_changing_ctx_waiting = 1;
910 
911  /* wait for sched_op completion */
912  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
913 #ifdef STARPU_SIMGRID
914  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
915 #endif
916  do
917  {
918  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
919  }
920  while (worker->state_sched_op_pending);
921 
922  /* reset flag so other sched_ops wont have to broadcast state */
923  worker->state_changing_ctx_waiting = 0;
924  }
925 }
926 
931 static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
932 {
933  worker->thread_changing_ctx = (starpu_pthread_t)0;
934  worker->state_changing_ctx_notice = 0;
935  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
936 }
937 
940 #ifdef STARPU_SPINLOCK_CHECK
941 static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
942 #else
943 static inline void _starpu_worker_relax_on(void)
944 #endif
945 {
946  struct _starpu_worker *worker = _starpu_get_local_worker_key();
947  if (worker == NULL)
948  return;
949  if (!worker->state_sched_op_pending)
950  return;
951  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
952 #ifdef STARPU_SPINLOCK_CHECK
953  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
954 #else
955  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
956 #endif
957  worker->state_relax_refcnt++;
958 #ifdef STARPU_SPINLOCK_CHECK
959  worker->relax_on_file = file;
960  worker->relax_on_line = line;
961  worker->relax_on_func = func;
962 #endif
963  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
964  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
965 }
966 #ifdef STARPU_SPINLOCK_CHECK
967 #define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
968 #endif
969 #define starpu_worker_relax_on _starpu_worker_relax_on
970 
972 #ifdef STARPU_SPINLOCK_CHECK
973 static inline void __starpu_worker_relax_on_locked(struct _starpu_worker *worker, const char*file, int line, const char* func)
974 #else
975 static inline void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
976 #endif
977 {
978  if (!worker->state_sched_op_pending)
979  return;
980 #ifdef STARPU_SPINLOCK_CHECK
981  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
982 #else
983  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
984 #endif
985  worker->state_relax_refcnt++;
986 #ifdef STARPU_SPINLOCK_CHECK
987  worker->relax_on_file = file;
988  worker->relax_on_line = line;
989  worker->relax_on_func = func;
990 #endif
991  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
992 }
993 #ifdef STARPU_SPINLOCK_CHECK
994 #define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
995 #endif
996 
997 #ifdef STARPU_SPINLOCK_CHECK
998 static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
999 #else
1000 static inline void _starpu_worker_relax_off(void)
1001 #endif
1002 {
1003  int workerid = starpu_worker_get_id();
1004  if (workerid == -1)
1005  return;
1006  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1007  STARPU_ASSERT(worker != NULL);
1008  if (!worker->state_sched_op_pending)
1009  return;
1010  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1011 #ifdef STARPU_SPINLOCK_CHECK
1012  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1013 #else
1014  STARPU_ASSERT(worker->state_relax_refcnt>0);
1015 #endif
1016  worker->state_relax_refcnt--;
1017 #ifdef STARPU_SPINLOCK_CHECK
1018  worker->relax_off_file = file;
1019  worker->relax_off_line = line;
1020  worker->relax_off_func = func;
1021 #endif
1022  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1023 }
1024 #ifdef STARPU_SPINLOCK_CHECK
1025 #define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1026 #endif
1027 #define starpu_worker_relax_off _starpu_worker_relax_off
1028 
1029 #ifdef STARPU_SPINLOCK_CHECK
1030 static inline void __starpu_worker_relax_off_locked(const char*file, int line, const char* func)
1031 #else
1032 static inline void _starpu_worker_relax_off_locked(void)
1033 #endif
1034 {
1035  int workerid = starpu_worker_get_id();
1036  if (workerid == -1)
1037  return;
1038  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1039  STARPU_ASSERT(worker != NULL);
1040  if (!worker->state_sched_op_pending)
1041  return;
1042 #ifdef STARPU_SPINLOCK_CHECK
1043  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1044 #else
1045  STARPU_ASSERT(worker->state_relax_refcnt>0);
1046 #endif
1047  worker->state_relax_refcnt--;
1048 #ifdef STARPU_SPINLOCK_CHECK
1049  worker->relax_off_file = file;
1050  worker->relax_off_line = line;
1051  worker->relax_off_func = func;
1052 #endif
1053 }
1054 #ifdef STARPU_SPINLOCK_CHECK
1055 #define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1056 #endif
1057 
1058 static inline int _starpu_worker_get_relax_state(void)
1059 {
1060  int workerid = starpu_worker_get_id();
1061  if (workerid < 0)
1062  return 1;
1063  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1064  STARPU_ASSERT(worker != NULL);
1065  return worker->state_relax_refcnt != 0;
1066 }
1067 #define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1068 
1073 static inline void _starpu_worker_lock(int workerid)
1074 {
1075  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1076  STARPU_ASSERT(worker != NULL);
1077  int cur_workerid = starpu_worker_get_id();
1078  if (workerid != cur_workerid)
1079  {
1080  starpu_worker_relax_on();
1081 
1082  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1083  while (!worker->state_relax_refcnt)
1084  {
1085  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
1086  }
1087  }
1088  else
1089  {
1090  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1091  }
1092 }
1093 
1094 static inline int _starpu_worker_trylock(int workerid)
1095 {
1096  struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1097  int cur_workerid = cur_worker->workerid;
1098  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1099  STARPU_ASSERT(worker != NULL);
1100 
1101  /* Start with ourself */
1102  int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->sched_mutex);
1103  if (ret)
1104  return ret;
1105  if (workerid == cur_workerid)
1106  /* We only needed to lock ourself */
1107  return 0;
1108 
1109  /* Now try to lock the other worker */
1110  ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
1111  if (!ret)
1112  {
1113  /* Good, check that it is relaxed */
1114  ret = !worker->state_relax_refcnt;
1115  if (ret)
1116  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1117  }
1118  if (!ret)
1119  _starpu_worker_relax_on_locked(cur_worker);
1120  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
1121  return ret;
1122 }
1123 
1124 static inline void _starpu_worker_unlock(int workerid)
1125 {
1126  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1127  STARPU_ASSERT(worker != NULL);
1128  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1129  int cur_workerid = starpu_worker_get_id();
1130  if (workerid != cur_workerid)
1131  {
1132  starpu_worker_relax_off();
1133  }
1134 }
1135 
1136 static inline void _starpu_worker_lock_self(void)
1137 {
1138  int workerid = starpu_worker_get_id_check();
1139  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1140  STARPU_ASSERT(worker != NULL);
1141  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1142 }
1143 
1144 static inline void _starpu_worker_unlock_self(void)
1145 {
1146  int workerid = starpu_worker_get_id_check();
1147  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1148  STARPU_ASSERT(worker != NULL);
1149  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1150 }
1151 
1152 static inline int _starpu_wake_worker_relax(int workerid)
1153 {
1154  _starpu_worker_lock(workerid);
1155  int ret = starpu_wake_worker_locked(workerid);
1156  _starpu_worker_unlock(workerid);
1157  return ret;
1158 }
1159 
1160 int starpu_wake_worker_relax_light(int workerid);
1161 
1166 void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
1167 
1168 /* @}*/
1169 
1170 #endif // __WORKERS_H__
struct starpu_task ** local_ordered_tasks
Definition: workers.h:131
void _starpu_may_pause(void)
unsigned devid
Definition: workers.h:78
static unsigned __starpu_worker_get_id_check(const char *f, int l)
Definition: workers.h:624
static void _starpu_worker_relax_on(void)
Definition: workers.h:943
int pause_depth
Definition: workers.h:413
unsigned char pipeline_length
Definition: workers.h:145
unsigned ncudagpus
Definition: workers.h:286
static void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker *const worker)
Definition: workers.h:645
int workerid
Definition: workers.h:81
unsigned state_unblock_in_parallel_req
Definition: workers.h:111
unsigned current_ordered_task_order
Definition: workers.h:134
static unsigned _starpu_get_nsched_ctxs(void)
Definition: workers.h:596
unsigned subworkerid
Definition: workers.h:79
unsigned nhwcudagpus
Definition: workers.h:270
unsigned nb_buffers_totransfer
Definition: workers.h:169
int current_mpi_deviceid
Definition: workers.h:369
unsigned memory_node
Definition: workers.h:205
unsigned nhwopenclgpus
Definition: workers.h:275
void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task)
static void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker *const worker)
Definition: workers.h:931
unsigned nmpidevices
Definition: workers.h:295
int combined_workerid
Definition: workers.h:82
Definition: workers.h:345
struct starpu_perfmodel_arch perf_arch
Definition: workers.h:202
unsigned has_prev_init
Definition: workers.h:162
unsigned run_by_starpu
Definition: workers.h:154
unsigned nsched_ctxs
Definition: workers.h:159
starpu_pthread_t thread_changing_ctx
Definition: workers.h:121
static struct _starpu_machine_config * _starpu_get_machine_config(void)
Definition: workers.h:548
void _starpu_set_argc_argv(int *argc, char ***argv)
uint32_t _starpu_can_submit_opencl_task(void)
unsigned started
Definition: workers.h:229
static void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
Definition: workers.h:514
int cuda_nodeid
Definition: workers.h:374
uint32_t worker_mask
Definition: workers.h:75
static struct _starpu_sched_ctx * _starpu_get_initial_sched_ctx(void)
Definition: workers.h:573
unsigned current_ordered_task
Definition: workers.h:133
unsigned local_ordered_tasks_size
Definition: workers.h:132
static void _starpu_worker_lock(int workerid)
Definition: workers.h:1073
static void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker *const worker)
Definition: workers.h:694
unsigned state_relax_refcnt
Definition: workers.h:95
struct starpu_task * task_transferring
Definition: workers.h:170
Definition: workers.h:240
unsigned pop_ctx_priority
Definition: workers.h:186
static void _starpu_set_local_worker_key(struct _starpu_worker *worker)
Definition: workers.h:496
unsigned nhwmpi
Definition: workers.h:280
static struct _starpu_worker_set * _starpu_get_local_worker_set_key(void)
Definition: workers.h:522
unsigned block_in_parallel_ref_count
Definition: workers.h:120
unsigned submitting
Definition: workers.h:419
uint32_t _starpu_can_submit_cpu_task(void)
Definition: workers.h:200
uint32_t worker_mask
Definition: workers.h:203
void _starpu_conf_check_environment(struct starpu_conf *conf)
starpu_pthread_cond_t ready_cond
Definition: workers.h:86
unsigned nb_buffers_transferred
Definition: workers.h:168
unsigned char first_task
Definition: workers.h:143
unsigned memory_node
Definition: workers.h:87
unsigned nhwcpus
Definition: workers.h:260
unsigned nopenclgpus
Definition: workers.h:292
unsigned state_blocked_in_parallel
Definition: workers.h:107
struct starpu_conf conf
Definition: workers.h:404
static unsigned _starpu_machine_is_running(void)
Definition: workers.h:445
Definition: workers.h:71
static enum _starpu_worker_status _starpu_worker_get_status(int workerid)
Definition: workers.h:560
unsigned state_block_in_parallel_ack
Definition: workers.h:110
unsigned nhwmicdevices
Definition: workers.h:303
int bindid
Definition: workers.h:80
static struct _starpu_worker * _starpu_get_worker_struct(unsigned id)
Definition: workers.h:531
starpu_pthread_cond_t ready_cond
Definition: workers.h:232
unsigned state_changing_ctx_waiting
Definition: workers.h:105
int current_bindid
Definition: workers.h:355
static void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker *const worker)
Definition: workers.h:889
unsigned ncpus
Definition: workers.h:283
void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
int current_mic_deviceid
Definition: workers.h:366
static void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker *const worker)
Definition: workers.h:743
static void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
Definition: workers.h:975
unsigned spinning_backoff
Definition: workers.h:166
int current_cuda_gpuid
Definition: workers.h:360
void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync)
static void _starpu_worker_enter_sched_op(struct _starpu_worker *const worker)
Definition: workers.h:798
struct starpu_tree * tree
Definition: workers.h:255
int current_rank
Definition: workers.h:83
unsigned numa_memory_node
Definition: workers.h:88
uint32_t worker_mask
Definition: workers.h:401
unsigned state_sched_op_pending
Definition: workers.h:104
starpu_pthread_t worker_thread
Definition: workers.h:77
int worker_size
Definition: workers.h:84
unsigned ncombinedworkers
Definition: workers.h:246
unsigned nworkers
Definition: workers.h:243
unsigned is_slave_somewhere
Definition: workers.h:187
Definition: workers.h:224
int mic_nodeid
Definition: workers.h:378
unsigned state_block_in_parallel_req
Definition: workers.h:109
uint32_t _starpu_can_submit_cuda_task(void)
uint32_t _starpu_worker_exists(struct starpu_task *)
void _starpu_worker_apply_deferred_ctx_changes(void)
unsigned char pipeline_stuck
Definition: workers.h:146
hwloc_topology_t hwtopology
Definition: workers.h:252
static void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
Definition: workers.h:567
unsigned state_changing_ctx_notice
Definition: workers.h:106
int cpus_nodeid
Definition: workers.h:372
unsigned state_unblock_in_parallel_ack
Definition: workers.h:112
struct starpu_task * current_task
Definition: workers.h:135
starpu_pthread_cond_t sched_cond
Definition: workers.h:93
starpu_pthread_mutex_t sched_mutex
Definition: workers.h:94
static int _starpu_get_disable_kernels(void)
Definition: workers.h:554
unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker)
unsigned nbindid
Definition: workers.h:396
static struct _starpu_worker * _starpu_get_local_worker_key(void)
Definition: workers.h:504
unsigned state_blocked_in_parallel_observed
Definition: workers.h:108
static int _starpu_worker_get_id(void)
Definition: workers.h:604
int opencl_nodeid
Definition: workers.h:376
void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync)
unsigned running
Definition: workers.h:407
starpu_pthread_cond_t started_cond
Definition: workers.h:85
int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
int current_opencl_gpuid
Definition: workers.h:363
void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig)
static struct _starpu_sched_ctx * _starpu_get_sched_ctx_struct(unsigned id)
Definition: workers.h:539
unsigned char ntasks
Definition: workers.h:144
unsigned state_keep_awake
Definition: workers.h:151
unsigned nhwpus
Definition: workers.h:265
int mpi_nodeid
Definition: workers.h:380
starpu_pthread_t worker_thread
Definition: workers.h:227