MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mt.cpp
1 /* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include <ndb_global.h>
17 
18 #include <VMSignal.hpp>
19 #include <kernel_types.h>
20 #include <Prio.hpp>
21 #include <SignalLoggerManager.hpp>
22 #include <SimulatedBlock.hpp>
23 #include <ErrorHandlingMacros.hpp>
24 #include <GlobalData.hpp>
25 #include <WatchDog.hpp>
26 #include <TransporterDefinitions.hpp>
27 #include "FastScheduler.hpp"
28 #include "mt.hpp"
29 #include <DebuggerNames.hpp>
30 #include <signaldata/StopForCrash.hpp>
31 #include "TransporterCallbackKernel.hpp"
32 #include <NdbSleep.h>
33 #include <portlib/ndb_prefetch.h>
34 
35 #include "mt-asm.h"
36 
37 inline
39 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
40 {
41  SimulatedBlock* b = getBlock(blockNo);
42  if (b != 0 && instanceNo != 0)
43  b = b->getInstance(instanceNo);
44  return b;
45 }
46 
47 #ifdef __GNUC__
48 /* Provides a small (but noticeable) speedup in benchmarks. */
49 #define memcpy __builtin_memcpy
50 #endif
51 
52 /* size of a cacheline */
53 #define NDB_CL 64
54 
55 /* Constants found by benchmarks to be reasonable values. */
56 
57 /* Maximum number of signals to execute before sending to remote nodes. */
58 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
59 
60 /*
61  * Max. signals to execute from one job buffer before considering other
62  * possible stuff to do.
63  */
64 static const Uint32 MAX_SIGNALS_PER_JB = 100;
65 
69 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
70 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
71 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
72 
73 //#define NDB_MT_LOCK_TO_CPU
74 
75 #define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
76 #define NUM_MAIN_THREADS 2 // except receiver
77 #define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
78 
79 /* If this is too small it crashes before first signal. */
80 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
81 
82 static Uint32 num_lqh_workers = 0;
83 static Uint32 num_lqh_threads = 0;
84 static Uint32 num_threads = 0;
85 static Uint32 receiver_thread_no = 0;
86 
87 #define NO_SEND_THREAD (MAX_THREADS + 1)
88 
89 /* max signal is 32 words, 7 for signal header and 25 datawords */
90 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
91 
93 {
94  const void * m_ptr;
95  char * m_name;
96  Uint32 m_contended_count;
97  Uint32 m_spin_count;
98 };
99 static void register_lock(const void * ptr, const char * name);
100 static mt_lock_stat * lookup_lock(const void * ptr);
101 
102 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
103 #define USE_FUTEX
104 #endif
105 
106 #ifdef USE_FUTEX
107 #ifndef _GNU_SOURCE
108 #define _GNU_SOURCE
109 #endif
110 #include <unistd.h>
111 #include <sys/syscall.h>
112 #include <sys/types.h>
113 
114 #define FUTEX_WAIT 0
115 #define FUTEX_WAKE 1
116 #define FUTEX_FD 2
117 #define FUTEX_REQUEUE 3
118 #define FUTEX_CMP_REQUEUE 4
119 #define FUTEX_WAKE_OP 5
120 
121 static inline
122 int
123 futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
124 {
125  return syscall(SYS_futex,
126  addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
127 }
128 
129 static inline
130 int
131 futex_wake(volatile unsigned * addr)
132 {
133  return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
134 }
135 
136 struct thr_wait
137 {
138  volatile unsigned m_futex_state;
139  enum {
140  FS_RUNNING = 0,
141  FS_SLEEPING = 1
142  };
143  thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
144  void init () {}
145 };
146 
156 static inline
157 bool
158 yield(struct thr_wait* wait, const Uint32 nsec,
159  bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
160 {
161  volatile unsigned * val = &wait->m_futex_state;
162 #ifndef NDEBUG
163  int old =
164 #endif
165  xcng(val, thr_wait::FS_SLEEPING);
166  assert(old == thr_wait::FS_RUNNING);
167 
179  bool waited = (*check_callback)(check_arg);
180  if (waited)
181  {
182  struct timespec timeout;
183  timeout.tv_sec = 0;
184  timeout.tv_nsec = nsec;
185  futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
186  }
187  xcng(val, thr_wait::FS_RUNNING);
188  return waited;
189 }
190 
191 static inline
192 int
193 wakeup(struct thr_wait* wait)
194 {
195  volatile unsigned * val = &wait->m_futex_state;
201  if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
202  {
203  return futex_wake(val);
204  }
205  return 0;
206 }
207 #else
208 #include <NdbMutex.h>
209 #include <NdbCondition.h>
210 
211 struct thr_wait
212 {
213  bool m_need_wakeup;
214  NdbMutex *m_mutex;
215  NdbCondition *m_cond;
216  thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
217 
218  void init() {
219  m_mutex = NdbMutex_Create();
220  m_cond = NdbCondition_Create();
221  }
222 };
223 
224 static inline
225 bool
226 yield(struct thr_wait* wait, const Uint32 nsec,
227  bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
228 {
229  struct timespec end;
230  NdbCondition_ComputeAbsTime(&end, nsec/1000000);
231  NdbMutex_Lock(wait->m_mutex);
232 
233  Uint32 waits = 0;
234  /* May have spurious wakeups: Always recheck condition predicate */
235  while ((*check_callback)(check_arg))
236  {
237  wait->m_need_wakeup = true;
238  waits++;
239  if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
240  wait->m_mutex, &end) == ETIMEDOUT)
241  {
242  wait->m_need_wakeup = false;
243  break;
244  }
245  }
246  NdbMutex_Unlock(wait->m_mutex);
247  return (waits > 0);
248 }
249 
250 
251 static inline
252 int
253 wakeup(struct thr_wait* wait)
254 {
255  NdbMutex_Lock(wait->m_mutex);
256  // We should avoid signaling when not waiting for wakeup
257  if (wait->m_need_wakeup)
258  {
259  wait->m_need_wakeup = false;
260  NdbCondition_Signal(wait->m_cond);
261  }
262  NdbMutex_Unlock(wait->m_mutex);
263  return 0;
264 }
265 
266 #endif
267 
268 #ifdef NDB_HAVE_XCNG
269 template <unsigned SZ>
270 struct thr_spin_lock
271 {
272  thr_spin_lock(const char * name = 0)
273  {
274  m_lock = 0;
275  register_lock(this, name);
276  }
277 
278  union {
279  volatile Uint32 m_lock;
280  char pad[SZ];
281  };
282 };
283 
284 static
285 ATTRIBUTE_NOINLINE
286 void
287 lock_slow(void * sl, volatile unsigned * val)
288 {
289  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
290 
291 loop:
292  Uint32 spins = 0;
293  do {
294  spins++;
295  cpu_pause();
296  } while (* val == 1);
297 
298  if (unlikely(xcng(val, 1) != 0))
299  goto loop;
300 
301  if (s)
302  {
303  s->m_spin_count += spins;
304  Uint32 count = ++s->m_contended_count;
305  Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
306 
307  if ((count % freq) == 0)
308  printf("%s waiting for lock, contentions: %u spins: %u\n",
309  s->m_name, count, s->m_spin_count);
310  }
311 }
312 
313 template <unsigned SZ>
314 static
315 inline
316 void
317 lock(struct thr_spin_lock<SZ>* sl)
318 {
319  volatile unsigned* val = &sl->m_lock;
320  if (likely(xcng(val, 1) == 0))
321  return;
322 
323  lock_slow(sl, val);
324 }
325 
326 template <unsigned SZ>
327 static
328 inline
329 void
330 unlock(struct thr_spin_lock<SZ>* sl)
331 {
336  mb();
337  sl->m_lock = 0;
338 }
339 
340 template <unsigned SZ>
341 static
342 inline
343 int
344 trylock(struct thr_spin_lock<SZ>* sl)
345 {
346  volatile unsigned* val = &sl->m_lock;
347  return xcng(val, 1);
348 }
349 #else
350 #define thr_spin_lock thr_mutex
351 #endif
352 
353 template <unsigned SZ>
354 struct thr_mutex
355 {
356  thr_mutex(const char * name = 0) {
357  NdbMutex_Init(&m_mutex);
358  register_lock(this, name);
359  }
360 
361  union {
362  NdbMutex m_mutex;
363  char pad[SZ];
364  };
365 };
366 
367 template <unsigned SZ>
368 static
369 inline
370 void
371 lock(struct thr_mutex<SZ>* sl)
372 {
373  NdbMutex_Lock(&sl->m_mutex);
374 }
375 
376 template <unsigned SZ>
377 static
378 inline
379 void
380 unlock(struct thr_mutex<SZ>* sl)
381 {
382  NdbMutex_Unlock(&sl->m_mutex);
383 }
384 
385 template <unsigned SZ>
386 static
387 inline
388 int
389 trylock(struct thr_mutex<SZ> * sl)
390 {
391  return NdbMutex_Trylock(&sl->m_mutex);
392 }
393 
397 template<typename T>
399 {
400  thr_safe_pool(const char * name) : m_free_list(0), m_cnt(0), m_lock(name) {}
401 
402  T* m_free_list;
403  Uint32 m_cnt;
404  thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
405 
406  T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
407  T* ret = 0;
408  lock(&m_lock);
409  if (m_free_list)
410  {
411  assert(m_cnt);
412  m_cnt--;
413  ret = m_free_list;
414  m_free_list = ret->m_next;
415  unlock(&m_lock);
416  }
417  else
418  {
419  Uint32 dummy;
420  unlock(&m_lock);
421  ret = reinterpret_cast<T*>
422  (mm->alloc_page(rg, &dummy,
423  Ndbd_mem_manager::NDB_ZONE_ANY));
424  // ToDo: How to deal with failed allocation?!?
425  // I think in this case we need to start grabbing buffers kept for signal
426  // trace.
427  }
428  return ret;
429  }
430 
431  void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
432  lock(&m_lock);
433  t->m_next = m_free_list;
434  m_free_list = t;
435  m_cnt++;
436  unlock(&m_lock);
437  }
438 
439  void release_list(Ndbd_mem_manager *mm, Uint32 rg,
440  T* head, T* tail, Uint32 cnt) {
441  lock(&m_lock);
442  tail->m_next = m_free_list;
443  m_free_list = head;
444  m_cnt += cnt;
445  unlock(&m_lock);
446  }
447 };
448 
452 template<typename T>
454 {
455 public:
456  thread_local_pool(thr_safe_pool<T> *global_pool, unsigned max_free) :
457  m_max_free(max_free),
458  m_free(0),
459  m_freelist(0),
460  m_global_pool(global_pool)
461  {
462  }
463 
464  T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
465  T *tmp = m_freelist;
466  if (tmp)
467  {
468  m_freelist = tmp->m_next;
469  assert(m_free > 0);
470  m_free--;
471  }
472  else
473  tmp = m_global_pool->seize(mm, rg);
474 
475  validate();
476  return tmp;
477  }
478 
479  void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
480  unsigned free = m_free;
481  if (free < m_max_free)
482  {
483  m_free = free + 1;
484  t->m_next = m_freelist;
485  m_freelist = t;
486  }
487  else
488  m_global_pool->release(mm, rg, t);
489 
490  validate();
491  }
492 
497  void release_local(T *t) {
498  m_free++;
499  t->m_next = m_freelist;
500  m_freelist = t;
501 
502  validate();
503  }
504 
505  void validate() const {
506 #ifdef VM_TRACE
507  Uint32 cnt = 0;
508  T* t = m_freelist;
509  while (t)
510  {
511  cnt++;
512  t = t->m_next;
513  }
514  assert(cnt == m_free);
515 #endif
516  }
517 
522  void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
523  validate();
524  unsigned cnt = 0;
525  unsigned free = m_free;
526  Uint32 maxfree = m_max_free;
527  assert(maxfree > 0);
528 
529  T* head = m_freelist;
530  T* tail = m_freelist;
531  if (free > maxfree)
532  {
533  cnt++;
534  free--;
535 
536  while (free > maxfree)
537  {
538  cnt++;
539  free--;
540  tail = tail->m_next;
541  }
542 
543  assert(free == maxfree);
544 
545  m_free = free;
546  m_freelist = tail->m_next;
547  m_global_pool->release_list(mm, rg, head, tail, cnt);
548  }
549  validate();
550  }
551 
552  void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
553  validate();
554  T* head = m_freelist;
555  T* tail = m_freelist;
556  if (tail)
557  {
558  unsigned cnt = 1;
559  while (tail->m_next != 0)
560  {
561  cnt++;
562  tail = tail->m_next;
563  }
564  m_global_pool->release_list(mm, rg, head, tail, cnt);
565  m_free = 0;
566  m_freelist = 0;
567  }
568  validate();
569  }
570 
571  void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
572 
573 private:
574  unsigned m_max_free;
575  unsigned m_free;
576  T *m_freelist;
577  thr_safe_pool<T> *m_global_pool;
578 };
579 
588 struct thr_job_buffer // 32k
589 {
590  static const unsigned SIZE = 8190;
591 
592  /*
593  * Amount of signal data currently in m_data buffer.
594  * Read/written by producer, read by consumer.
595  */
596  Uint32 m_len;
597  /*
598  * Whether this buffer contained prio A or prio B signals, used when dumping
599  * signals from released buffers.
600  */
601  Uint32 m_prioa;
602  union {
603  Uint32 m_data[SIZE];
604 
605  thr_job_buffer * m_next; // For free-list
606  };
607 };
608 
609 static
610 inline
611 Uint32
612 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
613 {
614  return (wi >= ri) ? wi - ri : (sz - ri) + wi;
615 }
616 
627 {
628  unsigned m_read_index; // Read/written by consumer, read by producer
629  unsigned m_write_index; // Read/written by producer, read by consumer
630 
631  Uint32 used() const;
632 };
633 
635 {
636  static const unsigned SIZE = 31;
637 
638  struct thr_job_queue_head* m_head;
639  struct thr_job_buffer* m_buffers[SIZE];
640 };
641 
642 inline
643 Uint32
644 thr_job_queue_head::used() const
645 {
646  return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
647 }
648 
649 /*
650  * Two structures tightly associated with thr_job_queue.
651  *
652  * There will generally be exactly one thr_jb_read_state and one
653  * thr_jb_write_state associated with each thr_job_queue.
654  *
655  * The reason they are kept separate is to avoid unnecessary inter-CPU
656  * cache line pollution. All fields shared among producer and consumer
657  * threads are in thr_job_queue, thr_jb_write_state fields are only
658  * accessed by the producer thread(s), and thr_jb_read_state fields are
659  * only accessed by the consumer thread.
660  *
661  * For example, on Intel core 2 quad processors, there is a ~33%
662  * penalty for two cores accessing the same 64-byte cacheline.
663  */
665 {
666  /*
667  * The position to insert the next signal into the queue.
668  *
669  * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
670  * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
671  * at which to store the next signal.
672  */
673  Uint32 m_write_index;
674  Uint32 m_write_pos;
675 
676  /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
677  thr_job_buffer *m_write_buffer;
678 
679  /* Number of signals inserted since last flush to thr_job_queue. */
680  Uint32 m_pending_signals;
681 
682  /* Number of signals inserted since last wakeup */
683  Uint32 m_pending_signals_wakeup;
684 };
685 
686 /*
687  * This structure is also used when dumping signal traces, to dump executed
688  * signals from the buffer(s) currently being processed.
689  */
691 {
692  /*
693  * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
694  * executing signals from.
695  */
696  Uint32 m_read_index;
697  /*
698  * Index into m_read_buffer->m_data[] of the next signal to execute from the
699  * current buffer.
700  */
701  Uint32 m_read_pos;
702  /*
703  * Thread local copy of thr_job_queue::m_buffers[m_read_index].
704  */
705  thr_job_buffer *m_read_buffer;
706  /*
707  * These are thread-local copies of thr_job_queue::m_write_index and
708  * thr_job_buffer::m_len. They are read once at the start of the signal
709  * execution loop and used to determine when the end of available signals is
710  * reached.
711  */
712  Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
713 
714  Uint32 m_write_index; // Last available thr_job_buffer.
715 
716  bool is_empty() const
717  {
718  assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
719  return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
720  }
721 };
722 
726 struct thr_tq
727 {
728  static const unsigned SQ_SIZE = 512;
729  static const unsigned LQ_SIZE = 512;
730  static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
731 
732  Uint32 * m_delayed_signals[PAGES];
733  Uint32 m_next_free;
734  Uint32 m_next_timer;
735  Uint32 m_current_time;
736  Uint32 m_cnt[2];
737  Uint32 m_short_queue[SQ_SIZE];
738  Uint32 m_long_queue[LQ_SIZE];
739 };
740 
741 /*
742  * Max number of thread-local job buffers to keep before releasing to
743  * global pool.
744  */
745 #define THR_FREE_BUF_MAX 32
746 /* Minimum number of buffers (to ensure useful trace dumps). */
747 #define THR_FREE_BUF_MIN 12
748 /*
749  * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
750  * at a time from/to global pool.
751  */
752 #define THR_FREE_BUF_BATCH 6
753 
758 {
759  static const Uint32 PGSIZE = 32768;
760 #if SIZEOF_CHARP == 4
761  static const Uint32 HEADER_SIZE = 8;
762 #else
763  static const Uint32 HEADER_SIZE = 12;
764 #endif
765 
766  static Uint32 max_bytes() {
767  return PGSIZE - offsetof(thr_send_page, m_data);
768  }
769 
770  /* Next page */
771  thr_send_page* m_next;
772 
773  /* Bytes of send data available in this page. */
774  Uint16 m_bytes;
775 
776  /* Start of unsent data */
777  Uint16 m_start;
778 
779  /* Data; real size is to the end of one page. */
780  char m_data[2];
781 };
782 
787 {
788  thr_send_page* m_first_page;
789  thr_send_page* m_last_page;
790 };
791 
796 {
797  unsigned m_write_index;
798 #if SIZEOF_CHARP == 8
799  unsigned m_unused;
800  thr_send_page* m_buffers[7];
801  static const unsigned SIZE = 7;
802 #else
803  thr_send_page* m_buffers[15];
804  static const unsigned SIZE = 15;
805 #endif
806 };
807 
808 struct thr_data
809 {
810  thr_data() : m_jba_write_lock("jbalock"),
811  m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
812 
813  thr_wait m_waiter;
814  unsigned m_thr_no;
815 
820 
825 
826  Uint64 m_time;
827  struct thr_tq m_tq;
828 
829  /* Prio A signal incoming queue. */
830  struct thr_spin_lock<64> m_jba_write_lock;
831  struct thr_job_queue m_jba;
832 
833  struct thr_job_queue_head m_jba_head;
834 
835  /* Thread-local read state of prio A buffer. */
836  struct thr_jb_read_state m_jba_read_state;
837  /*
838  * There is no m_jba_write_state, as we have multiple writers to the prio A
839  * queue, so local state becomes invalid as soon as we release the lock.
840  */
841 
842  /*
843  * In m_next_buffer we keep a free buffer at all times, so that when
844  * we hold the lock and find we need a new buffer, we can use this and this
845  * way defer allocation to after releasing the lock.
846  */
847  struct thr_job_buffer* m_next_buffer;
848 
849  /*
850  * We keep a small number of buffers in a thread-local cyclic FIFO, so that
851  * we can avoid going to the global pool in most cases, and so that we have
852  * recent buffers available for dumping in trace files.
853  */
854  struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
855  /* m_first_free is the index of the entry to return next from seize(). */
856  Uint32 m_first_free;
857  /* m_first_unused is the first unused entry in m_free_fifo. */
858  Uint32 m_first_unused;
859 
860  /*
861  * These are the thread input queues, where other threads deliver signals
862  * into.
863  */
864  struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
865  struct thr_job_queue m_in_queue[MAX_THREADS];
866  /* These are the write states of m_in_queue[self] in each thread. */
867  struct thr_jb_write_state m_write_states[MAX_THREADS];
868  /* These are the read states of all of our own m_in_queue[]. */
869  struct thr_jb_read_state m_read_states[MAX_THREADS];
870 
871  /* Jam buffers for making trace files at crashes. */
872  EmulatedJamBuffer m_jam;
873  /* Watchdog counter for this thread. */
874  Uint32 m_watchdog_counter;
875  /* Signal delivery statistics. */
876  Uint32 m_prioa_count;
877  Uint32 m_prioa_size;
878  Uint32 m_priob_count;
879  Uint32 m_priob_size;
880 
881  /* Array of node ids with pending remote send data. */
882  Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
883  /* Number of node ids in m_pending_send_nodes. */
884  Uint32 m_pending_send_count;
885 
890  Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
891 
892  /* pool for send buffers */
893  class thread_local_pool<thr_send_page> m_send_buffer_pool;
894 
895  /* Send buffer for this thread, these are not touched by any other thread */
896  struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
897 
898  /* Block instances (main and worker) handled by this thread. */
899  /* Used for sendpacked (send-at-job-buffer-end). */
900  Uint32 m_instance_count;
901  BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
902 
903  SectionSegmentPool::Cache m_sectionPoolCache;
904 
905  Uint32 m_cpu;
906  pthread_t m_thr_id;
907  NdbThread* m_thread;
908 };
909 
911 {
912  struct thr_data * m_selfptr;
913  mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
914  virtual ~mt_send_handle() {}
915 
916  virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
917  virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
918  virtual bool forceSend(NodeId node);
919 };
920 
922 {
923  trp_callback() {}
924 
925  /* Callback interface. */
926  int checkJobBuffer();
927  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
928  void lock_transporter(NodeId node);
929  void unlock_transporter(NodeId node);
930  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
931  Uint32 bytes_sent(NodeId node, Uint32 bytes);
932  bool has_data_to_send(NodeId node);
933  void reset_send_buffer(NodeId node, bool should_be_empty);
934 };
935 
936 extern trp_callback g_trp_callback; // Forward declaration
937 extern struct thr_repository g_thr_repository;
938 
939 #include <NdbMutex.h>
940 #include <NdbCondition.h>
941 
943 {
945  : m_receive_lock("recvlock"),
946  m_section_lock("sectionlock"),
947  m_mem_manager_lock("memmanagerlock"),
948  m_jb_pool("jobbufferpool"),
949  m_sb_pool("sendbufferpool")
950  {}
951 
952  struct thr_spin_lock<64> m_receive_lock;
953  struct thr_spin_lock<64> m_section_lock;
954  struct thr_spin_lock<64> m_mem_manager_lock;
955  struct thr_safe_pool<thr_job_buffer> m_jb_pool;
956  struct thr_safe_pool<thr_send_page> m_sb_pool;
957  Ndbd_mem_manager * m_mm;
958  unsigned m_thread_count;
959  struct thr_data m_thread[MAX_THREADS];
960 
965  /* The buffers that are to be sent */
966  struct send_buffer
967  {
971  struct thr_spin_lock<8> m_send_lock;
972 
977 
987  Uint32 m_force_send;
988 
993 
997  Uint32 m_bytes;
998 
999  /* read index(es) in thr_send_queue */
1000  Uint32 m_read_index[MAX_THREADS];
1001  } m_send_buffers[MAX_NTRANSPORTERS];
1002 
1003  /* The buffers published by threads */
1004  thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
1005 
1006  /*
1007  * These are used to synchronize during crash / trace dumps.
1008  *
1009  */
1010  NdbMutex stop_for_crash_mutex;
1011  NdbCondition stop_for_crash_cond;
1012  Uint32 stopped_threads;
1013 };
1014 
1015 #if 0
1016 static
1017 Uint32
1018 fifo_used_pages(struct thr_data* selfptr)
1019 {
1020  return calc_fifo_used(selfptr->m_first_unused,
1021  selfptr->m_first_free,
1022  THR_FREE_BUF_MAX);
1023 }
1024 #endif
1025 
1026 static
1027 void
1028 job_buffer_full(struct thr_data* selfptr)
1029 {
1030  ndbout_c("job buffer full");
1031  abort();
1032 }
1033 
1034 static
1035 void
1036 out_of_job_buffer(struct thr_data* selfptr)
1037 {
1038  ndbout_c("out of job buffer");
1039  abort();
1040 }
1041 
1042 static
1044 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
1045 {
1046  thr_job_buffer* jb;
1047  thr_data* selfptr = rep->m_thread + thr_no;
1048  Uint32 first_free = selfptr->m_first_free;
1049  Uint32 first_unused = selfptr->m_first_unused;
1050 
1051  /*
1052  * An empty FIFO is denoted by m_first_free == m_first_unused.
1053  * So we will never have a completely full FIFO array, at least one entry will
1054  * always be unused. But the code is simpler as a result.
1055  */
1056 
1057  /*
1058  * We never allow the fifo to become completely empty, as we want to have
1059  * a good number of signals available for trace files in case of a forced
1060  * shutdown.
1061  */
1062  Uint32 buffers = (first_free > first_unused ?
1063  first_unused + THR_FREE_BUF_MAX - first_free :
1064  first_unused - first_free);
1065  if (unlikely(buffers <= THR_FREE_BUF_MIN))
1066  {
1067  /*
1068  * All used, allocate another batch from global pool.
1069  *
1070  * Put the new buffers at the head of the fifo, so as not to needlessly
1071  * push out any existing buffers from the fifo (that would loose useful
1072  * data for signal dumps in trace files).
1073  */
1074  Uint32 cnt = 0;
1075  Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1076  assert(batch > 0);
1077  assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
1078  do {
1079  jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
1080  if (unlikely(jb == 0))
1081  {
1082  if (unlikely(cnt == 0))
1083  {
1084  out_of_job_buffer(selfptr);
1085  }
1086  break;
1087  }
1088  jb->m_len = 0;
1089  jb->m_prioa = false;
1090  first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1091  selfptr->m_free_fifo[first_free] = jb;
1092  batch--;
1093  } while (cnt < batch);
1094  selfptr->m_first_free = first_free;
1095  }
1096 
1097  jb= selfptr->m_free_fifo[first_free];
1098  selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1099  /* Init here rather than in release_buffer() so signal dump will work. */
1100  jb->m_len = 0;
1101  jb->m_prioa = prioa;
1102  return jb;
1103 }
1104 
1105 static
1106 void
1107 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
1108 {
1109  struct thr_data* selfptr = rep->m_thread + thr_no;
1110  Uint32 first_free = selfptr->m_first_free;
1111  Uint32 first_unused = selfptr->m_first_unused;
1112 
1113  /*
1114  * Pack near-empty signals, to get more info in the signal traces.
1115  *
1116  * This is not currently used, as we only release full job buffers, hence
1117  * the #if 0.
1118  */
1119 #if 0
1120  Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
1121  thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
1122  Uint32 len1, len2;
1123 
1124  if (!jb->m_prioa &&
1125  first_free != first_unused &&
1126  !last_jb->m_prioa &&
1127  (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
1128  (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
1129  {
1130  /*
1131  * The buffer being release is fairly empty, and what data it contains fit
1132  * in the previously released buffer.
1133  *
1134  * We want to avoid too many almost-empty buffers in the free fifo, as that
1135  * makes signal traces less useful due to too little data available. So in
1136  * this case we move the data from the buffer to be released into the
1137  * previous buffer, and place the to-be-released buffer at the head of the
1138  * fifo (to be immediately reused).
1139  *
1140  * This is only done for prio B buffers, as we must not merge prio A and B
1141  * data (or dumps would be incorrect), and prio A buffers are in any case
1142  * full when released.
1143  */
1144  memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
1145  last_jb->m_len = len1 + len2;
1146  jb->m_len = 0;
1147  first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1148  selfptr->m_free_fifo[first_free] = jb;
1149  selfptr->m_first_free = first_free;
1150  }
1151  else
1152 #endif
1153  {
1154  /* Just insert at the end of the fifo. */
1155  selfptr->m_free_fifo[first_unused] = jb;
1156  first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
1157  selfptr->m_first_unused = first_unused;
1158  }
1159 
1160  if (unlikely(first_unused == first_free))
1161  {
1162  /* FIFO full, need to release to global pool. */
1163  Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1164  assert(batch > 0);
1165  assert(batch < THR_FREE_BUF_MAX);
1166  do {
1167  rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
1168  selfptr->m_free_fifo[first_free]);
1169  first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1170  batch--;
1171  } while (batch > 0);
1172  selfptr->m_first_free = first_free;
1173  }
1174 }
1175 
1176 static
1177 inline
1178 Uint32
1179 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
1180 {
1181  Uint32 thr_no = selfptr->m_thr_no;
1182  Uint32 **pages = selfptr->m_tq.m_delayed_signals;
1183  Uint32 free = selfptr->m_tq.m_next_free;
1184  Uint32* save = ptr;
1185  for (Uint32 i = 0; i < cnt; i++, ptr++)
1186  {
1187  Uint32 val = * ptr;
1188  if ((val & 0xFFFF) <= end)
1189  {
1190  Uint32 idx = val >> 16;
1191  Uint32 buf = idx >> 8;
1192  Uint32 pos = 32 * (idx & 0xFF);
1193 
1194  Uint32* page = * (pages + buf);
1195 
1196  const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
1197  const Uint32 *data = page + pos + (sizeof(*s)>>2);
1198  if (0)
1199  ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
1200  /*
1201  * ToDo: Do measurements of the frequency of these prio A timed signals.
1202  *
1203  * If they are frequent, we may want to optimize, as sending one prio A
1204  * signal is somewhat expensive compared to sending one prio B.
1205  */
1206  sendprioa(thr_no, s, data,
1207  data + s->theLength);
1208  * (page + pos) = free;
1209  free = idx;
1210  }
1211  else if (i > 0)
1212  {
1213  selfptr->m_tq.m_next_free = free;
1214  memmove(save, ptr, 4 * (cnt - i));
1215  return i;
1216  }
1217  else
1218  {
1219  return 0;
1220  }
1221  }
1222  selfptr->m_tq.m_next_free = free;
1223  return cnt;
1224 }
1225 
1226 static
1227 void
1228 handle_time_wrap(struct thr_data* selfptr)
1229 {
1230  Uint32 i;
1231  struct thr_tq * tq = &selfptr->m_tq;
1232  Uint32 cnt0 = tq->m_cnt[0];
1233  Uint32 cnt1 = tq->m_cnt[1];
1234  Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
1235  Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
1236  cnt0 -= tmp0;
1237  cnt1 -= tmp1;
1238  tq->m_cnt[0] = cnt0;
1239  tq->m_cnt[1] = cnt1;
1240  for (i = 0; i<cnt0; i++)
1241  {
1242  assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
1243  tq->m_short_queue[i] -= 32767;
1244  }
1245  for (i = 0; i<cnt1; i++)
1246  {
1247  assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
1248  tq->m_long_queue[i] -= 32767;
1249  }
1250 }
1251 
1252 static
1253 void
1254 scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
1255 {
1256  struct thr_tq * tq = &selfptr->m_tq;
1257  NDB_TICKS last = selfptr->m_time;
1258 
1259  Uint32 curr = tq->m_current_time;
1260  Uint32 cnt0 = tq->m_cnt[0];
1261  Uint32 cnt1 = tq->m_cnt[1];
1262 
1263  assert(now > last);
1264  Uint64 diff = now - last;
1265  Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
1266  Uint32 end = (curr + step);
1267  if (end >= 32767)
1268  {
1269  handle_time_wrap(selfptr);
1270  cnt0 = tq->m_cnt[0];
1271  cnt1 = tq->m_cnt[1];
1272  end -= 32767;
1273  }
1274 
1275  Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
1276  Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
1277 
1278  tq->m_current_time = end;
1279  tq->m_cnt[0] = cnt0 - tmp0;
1280  tq->m_cnt[1] = cnt1 - tmp1;
1281  selfptr->m_time = last + step;
1282 }
1283 
1284 static inline
1285 void
1286 scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
1287 {
1288  if (selfptr->m_time != now)
1289  scan_time_queues_impl(selfptr, now);
1290 }
1291 
1292 static
1293 inline
1294 Uint32*
1295 get_free_slot(struct thr_repository* rep,
1296  struct thr_data* selfptr,
1297  Uint32* idxptr)
1298 {
1299  struct thr_tq * tq = &selfptr->m_tq;
1300  Uint32 idx = tq->m_next_free;
1301 retry:
1302  Uint32 buf = idx >> 8;
1303  Uint32 pos = idx & 0xFF;
1304 
1305  if (idx != RNIL)
1306  {
1307  Uint32* page = * (tq->m_delayed_signals + buf);
1308  Uint32* ptr = page + (32 * pos);
1309  tq->m_next_free = * ptr;
1310  * idxptr = idx;
1311  return ptr;
1312  }
1313 
1314  Uint32 thr_no = selfptr->m_thr_no;
1315  for (Uint32 i = 0; i<thr_tq::PAGES; i++)
1316  {
1317  if (tq->m_delayed_signals[i] == 0)
1318  {
1319  struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
1320  Uint32 * page = reinterpret_cast<Uint32*>(jb);
1321  tq->m_delayed_signals[i] = page;
1322 
1323  ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
1324 
1328  for (Uint32 j = 0; j<255; j ++)
1329  {
1330  page[j * 32] = (i << 8) + (j + 1);
1331  }
1332  page[255*32] = RNIL;
1333  idx = (i << 8);
1334  goto retry;
1335  }
1336  }
1337  abort();
1338  return NULL;
1339 }
1340 
1341 void
1342 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
1343 {
1344  struct thr_repository* rep = &g_thr_repository;
1345  struct thr_data * selfptr = rep->m_thread + thr_no;
1346  assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
1347  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
1348 
1349  Uint32 max;
1350  Uint32 * cntptr;
1351  Uint32 * queueptr;
1352 
1353  Uint32 alarm = selfptr->m_tq.m_current_time + delay;
1354  Uint32 nexttimer = selfptr->m_tq.m_next_timer;
1355  if (delay < 100)
1356  {
1357  cntptr = selfptr->m_tq.m_cnt + 0;
1358  queueptr = selfptr->m_tq.m_short_queue;
1359  max = thr_tq::SQ_SIZE;
1360  }
1361  else
1362  {
1363  cntptr = selfptr->m_tq.m_cnt + 1;
1364  queueptr = selfptr->m_tq.m_long_queue;
1365  max = thr_tq::LQ_SIZE;
1366  }
1367 
1368  Uint32 idx;
1369  Uint32* ptr = get_free_slot(rep, selfptr, &idx);
1370  memcpy(ptr, s, 4*siglen);
1371 
1372  if (0)
1373  ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
1374  selfptr->m_tq.m_current_time,
1375  alarm,
1376  getSignalName(s->theVerId_signalNumber),
1377  getBlockName(refToBlock(s->theSendersBlockRef)),
1378  getBlockName(s->theReceiversBlockNumber),
1379  delay,
1380  idx, ptr);
1381 
1382  Uint32 i;
1383  Uint32 cnt = *cntptr;
1384  Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
1385 
1386  * cntptr = cnt + 1;
1387  selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
1388 
1389  if (cnt == 0)
1390  {
1391  queueptr[0] = newentry;
1392  return;
1393  }
1394  else if (cnt < max)
1395  {
1396  for (i = 0; i<cnt; i++)
1397  {
1398  Uint32 save = queueptr[i];
1399  if ((save & 0xFFFF) > alarm)
1400  {
1401  memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
1402  queueptr[i] = newentry;
1403  return;
1404  }
1405  }
1406  assert(i == cnt);
1407  queueptr[i] = newentry;
1408  return;
1409  }
1410  else
1411  {
1412  abort();
1413  }
1414 }
1415 
1416 /*
1417  * Flush the write state to the job queue, making any new signals available to
1418  * receiving threads.
1419  *
1420  * Two versions:
1421  * - The general version flush_write_state_other() which may flush to
1422  * any thread, and possibly signal any waiters.
1423  * - The special version flush_write_state_self() which should only be used
1424  * to flush messages to itself.
1425  *
1426  * Call to these functions are encapsulated through flush_write_state
1427  * which decides which of these functions to call.
1428  */
1429 static inline
1430 void
1431 flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
1432 {
1433  /*
1434  * Can simplify the flush_write_state when writing to myself:
1435  * Simply update write references wo/ mutex, memory barrier and signaling
1436  */
1437  w->m_write_buffer->m_len = w->m_write_pos;
1438  q_head->m_write_index = w->m_write_index;
1439  w->m_pending_signals_wakeup = 0;
1440  w->m_pending_signals = 0;
1441 }
1442 
1443 static inline
1444 void
1445 flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
1446  thr_jb_write_state *w)
1447 {
1448  /*
1449  * Two write memory barriers here, as assigning m_len may make signal data
1450  * available to other threads, and assigning m_write_index may make new
1451  * buffers available.
1452  *
1453  * We could optimize this by only doing it as needed, and only doing it
1454  * once before setting all m_len, and once before setting all m_write_index.
1455  *
1456  * But wmb() is a no-op anyway in x86 ...
1457  */
1458  wmb();
1459  w->m_write_buffer->m_len = w->m_write_pos;
1460  wmb();
1461  q_head->m_write_index = w->m_write_index;
1462 
1463  w->m_pending_signals_wakeup += w->m_pending_signals;
1464  w->m_pending_signals = 0;
1465 
1466  if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
1467  {
1468  w->m_pending_signals_wakeup = 0;
1469  wakeup(&(dstptr->m_waiter));
1470  }
1471 }
1472 
1473 static inline
1474 void
1475 flush_write_state(const thr_data *selfptr, thr_data *dstptr,
1477 {
1478  if (dstptr == selfptr)
1479  {
1480  flush_write_state_self(q_head, w);
1481  }
1482  else
1483  {
1484  flush_write_state_other(dstptr, q_head, w);
1485  }
1486 }
1487 
1488 
1489 static
1490 void
1491 flush_jbb_write_state(thr_data *selfptr)
1492 {
1493  Uint32 thr_count = g_thr_repository.m_thread_count;
1494  Uint32 self = selfptr->m_thr_no;
1495 
1496  thr_jb_write_state *w = selfptr->m_write_states;
1497  thr_data *thrptr = g_thr_repository.m_thread;
1498  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
1499  {
1500  if (w->m_pending_signals || w->m_pending_signals_wakeup)
1501  {
1502  w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
1503  thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
1504  flush_write_state(selfptr, thrptr, q_head, w);
1505  }
1506  }
1507 }
1508 
1517 static int
1518 check_job_buffers(struct thr_repository* rep)
1519 {
1520  const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
1521  unsigned thr_no = receiver_thread_no;
1522  const thr_data *thrptr = rep->m_thread;
1523  for (unsigned i = 0; i<num_threads; i++, thrptr++)
1524  {
1532  const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1533  unsigned ri = q_head->m_read_index;
1534  unsigned wi = q_head->m_write_index;
1535  unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
1536  if (1 + minfree + busy >= thr_job_queue::SIZE)
1537  {
1538  return 1;
1539  }
1540  }
1541 
1542  return 0;
1543 }
1544 
1562 static
1563 Uint32
1564 compute_max_signals_to_execute(Uint32 thr_no)
1565 {
1566  Uint32 minfree = thr_job_queue::SIZE;
1567  const struct thr_repository* rep = &g_thr_repository;
1568  const thr_data *thrptr = rep->m_thread;
1569 
1570  for (unsigned i = 0; i<num_threads; i++, thrptr++)
1571  {
1579  const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1580  unsigned ri = q_head->m_read_index;
1581  unsigned wi = q_head->m_write_index;
1582  unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
1583 
1584  assert(free <= thr_job_queue::SIZE);
1585 
1586  if (free < minfree)
1587  minfree = free;
1588  }
1589 
1590 #define SAFETY 2
1591 
1592  if (minfree >= (1 + SAFETY))
1593  {
1594  return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
1595  }
1596  else
1597  {
1598  return 0;
1599  }
1600 }
1601 
1602 //#define NDBMT_RAND_YIELD
1603 #ifdef NDBMT_RAND_YIELD
1604 static Uint32 g_rand_yield = 0;
1605 static
1606 void
1607 rand_yield(Uint32 limit, void* ptr0, void * ptr1)
1608 {
1609  return;
1610  UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
1611  Uint8* tmpptr = (Uint8*)&tmp;
1612  Uint32 sum = g_rand_yield;
1613  for (Uint32 i = 0; i<sizeof(tmp); i++)
1614  sum = 33 * sum + tmpptr[i];
1615 
1616  if ((sum % 100) < limit)
1617  {
1618  g_rand_yield++;
1619  sched_yield();
1620  }
1621 }
1622 #else
1623 static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
1624 #endif
1625 
1626 
1627 
1628 void
1629 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
1630 {
1631  SignalT<3> signalT;
1632  Signal &signal = * new (&signalT) Signal(0);
1633  memset(&signal.header, 0, sizeof(signal.header));
1634 
1635  signal.header.theLength = 3;
1636  signal.header.theSendersSignalId = 0;
1637  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
1638  signal.theData[0] = NDB_LE_SendBytesStatistic;
1639  signal.theData[1] = nodeId;
1640  signal.theData[2] = (Uint32)(bytes/count);
1641  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
1642  signal.header.theReceiversBlockNumber = CMVMI;
1643  sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
1644  &signalT.header, signalT.theData, NULL);
1645 }
1646 
1658 void
1660 {
1661  struct thr_repository* rep = &g_thr_repository;
1671  lock(&rep->m_send_buffers[node].m_send_lock);
1672  lock(&rep->m_receive_lock);
1673 }
1674 
1675 void
1676 trp_callback::unlock_transporter(NodeId node)
1677 {
1678  struct thr_repository* rep = &g_thr_repository;
1679  unlock(&rep->m_receive_lock);
1680  unlock(&rep->m_send_buffers[node].m_send_lock);
1681 }
1682 
1683 int
1685 {
1686  struct thr_repository* rep = &g_thr_repository;
1687  if (unlikely(check_job_buffers(rep)))
1688  {
1689  do
1690  {
1709 // usleep(a-few-usec); /* A micro-sleep would likely have been better... */
1710 #if defined HAVE_SCHED_YIELD
1711  sched_yield();
1712 #elif defined _WIN32
1713  SwitchToThread();
1714 #else
1715  NdbSleep_MilliSleep(0);
1716 #endif
1717 
1718  } while (check_job_buffers(rep));
1719  }
1720 
1721  return 0;
1722 }
1723 
1732 static
1733 Uint32
1734 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
1735 {
1736  Uint32 ri[MAX_THREADS];
1737  Uint32 wi[MAX_THREADS];
1738  thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
1739  for (unsigned thr = 0; thr < num_threads; thr++)
1740  {
1741  ri[thr] = sb->m_read_index[thr];
1742  wi[thr] = src[thr].m_write_index;
1743  }
1744 
1745  Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
1746  thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
1747  sentinel_page->m_next = 0;
1748 
1749  struct thr_send_buffer tmp;
1750  tmp.m_first_page = sentinel_page;
1751  tmp.m_last_page = sentinel_page;
1752 
1753  Uint32 bytes = 0;
1754  for (unsigned thr = 0; thr < num_threads; thr++, src++)
1755  {
1756  Uint32 r = ri[thr];
1757  Uint32 w = wi[thr];
1758  if (r != w)
1759  {
1760  rmb();
1761  while (r != w)
1762  {
1763  thr_send_page * p = src->m_buffers[r];
1764  assert(p->m_start == 0);
1765  bytes += p->m_bytes;
1766  tmp.m_last_page->m_next = p;
1767  while (p->m_next != 0)
1768  {
1769  p = p->m_next;
1770  assert(p->m_start == 0);
1771  bytes += p->m_bytes;
1772  }
1773  tmp.m_last_page = p;
1774  assert(tmp.m_last_page != 0);
1775  r = (r + 1) % thr_send_queue::SIZE;
1776  }
1777  sb->m_read_index[thr] = r;
1778  }
1779  }
1780 
1781  if (bytes)
1782  {
1783  if (sb->m_bytes)
1784  {
1785  assert(sb->m_buffer.m_first_page != 0);
1786  assert(sb->m_buffer.m_last_page != 0);
1787  sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
1788  sb->m_buffer.m_last_page = tmp.m_last_page;
1789  }
1790  else
1791  {
1792  assert(sb->m_buffer.m_first_page == 0);
1793  assert(sb->m_buffer.m_last_page == 0);
1794  sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
1795  sb->m_buffer.m_last_page = tmp.m_last_page;
1796  }
1797  sb->m_bytes += bytes;
1798  }
1799 
1800  return sb->m_bytes;
1801 }
1802 
1803 Uint32
1805  struct iovec *dst, Uint32 max)
1806 {
1807  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1808 
1809  Uint32 bytes = link_thread_send_buffers(sb, node);
1810  if (max == 0 || bytes == 0)
1811  return 0;
1812 
1817  Uint32 tot = 0;
1818  Uint32 pos = 0;
1819  thr_send_page * p = sb->m_buffer.m_first_page;
1820  do {
1821  dst[pos].iov_len = p->m_bytes;
1822  dst[pos].iov_base = p->m_data + p->m_start;
1823  assert(p->m_start + p->m_bytes <= p->max_bytes());
1824  tot += p->m_bytes;
1825  pos++;
1826  max--;
1827  p = p->m_next;
1828  } while (max && p != 0);
1829 
1830  return pos;
1831 }
1832 
1833 static
1834 void
1835 release_list(thread_local_pool<thr_send_page>* pool,
1836  thr_send_page* head, thr_send_page * tail)
1837 {
1838  while (head != tail)
1839  {
1840  thr_send_page * tmp = head;
1841  head = head->m_next;
1842  pool->release_local(tmp);
1843  }
1844  pool->release_local(tail);
1845 }
1846 
1847 
1848 static
1849 Uint32
1850 bytes_sent(thread_local_pool<thr_send_page>* pool,
1851  thr_repository::send_buffer* sb, Uint32 bytes)
1852 {
1853  assert(bytes);
1854 
1855  Uint32 remain = bytes;
1856  thr_send_page * prev = 0;
1857  thr_send_page * curr = sb->m_buffer.m_first_page;
1858 
1859  assert(sb->m_bytes >= bytes);
1860  while (remain && remain >= curr->m_bytes)
1861  {
1862  remain -= curr->m_bytes;
1863  prev = curr;
1864  curr = curr->m_next;
1865  }
1866 
1867  Uint32 total_bytes = sb->m_bytes;
1868  if (total_bytes == bytes)
1869  {
1873  release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
1874  sb->m_buffer.m_first_page = 0;
1875  sb->m_buffer.m_last_page = 0;
1876  sb->m_bytes = 0;
1877  return 0;
1878  }
1879  else if (remain)
1880  {
1884  curr->m_start += remain;
1885  assert(curr->m_bytes > remain);
1886  curr->m_bytes -= remain;
1887  if (prev)
1888  {
1889  release_list(pool, sb->m_buffer.m_first_page, prev);
1890  }
1891  }
1892  else
1893  {
1897  if (prev)
1898  {
1899  release_list(pool, sb->m_buffer.m_first_page, prev);
1900  }
1901  else
1902  {
1903  pool->release_local(sb->m_buffer.m_first_page);
1904  }
1905  }
1906 
1907  sb->m_buffer.m_first_page = curr;
1908  assert(sb->m_bytes > bytes);
1909  sb->m_bytes -= bytes;
1910  return sb->m_bytes;
1911 }
1912 
1913 Uint32
1914 trp_callback::bytes_sent(NodeId node, Uint32 bytes)
1915 {
1916  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1917  Uint32 thr_no = sb->m_send_thread;
1918  assert(thr_no != NO_SEND_THREAD);
1919  return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
1920  sb, bytes);
1921 }
1922 
1923 bool
1925 {
1926  return true;
1927 
1928  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1929  Uint32 thr_no = sb->m_send_thread;
1930  assert(thr_no != NO_SEND_THREAD);
1931  assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
1932  if (sb->m_bytes > 0 || sb->m_force_send)
1933  return true;
1934 
1935  thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
1936 
1937  return sb->m_read_index[thr_no] != dst->m_write_index;
1938 }
1939 
1940 void
1941 trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
1942 {
1943  struct thr_repository *rep = &g_thr_repository;
1944  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1945  struct iovec v[32];
1946 
1947  thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
1948 
1949  lock(&sb->m_send_lock);
1950 
1951  for (;;)
1952  {
1953  Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
1954  if (count == 0)
1955  break;
1956  assert(!should_be_empty); // Got data when it should be empty
1957  int bytes = 0;
1958  for (Uint32 i = 0; i < count; i++)
1959  bytes += v[i].iov_len;
1960 
1961  ::bytes_sent(&pool, sb, bytes);
1962  }
1963 
1964  unlock(&sb->m_send_lock);
1965 
1966  pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
1967 }
1968 
1969 static inline
1970 void
1971 register_pending_send(thr_data *selfptr, Uint32 nodeId)
1972 {
1973  /* Mark that this node has pending send data. */
1974  if (!selfptr->m_pending_send_mask.get(nodeId))
1975  {
1976  selfptr->m_pending_send_mask.set(nodeId, 1);
1977  Uint32 i = selfptr->m_pending_send_count;
1978  selfptr->m_pending_send_nodes[i] = nodeId;
1979  selfptr->m_pending_send_count = i + 1;
1980  }
1981 }
1982 
1986 static
1987 void
1988 flush_send_buffer(thr_data* selfptr, Uint32 node)
1989 {
1990  Uint32 thr_no = selfptr->m_thr_no;
1991  thr_send_buffer * src = selfptr->m_send_buffers + node;
1992  thr_repository* rep = &g_thr_repository;
1993 
1994  if (src->m_first_page == 0)
1995  {
1996  return;
1997  }
1998  assert(src->m_last_page != 0);
1999 
2000  thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
2001  thr_repository::send_buffer* sb = rep->m_send_buffers+node;
2002 
2003  Uint32 wi = dst->m_write_index;
2004  Uint32 next = (wi + 1) % thr_send_queue::SIZE;
2005  Uint32 ri = sb->m_read_index[thr_no];
2006 
2007  if (unlikely(next == ri))
2008  {
2009  lock(&sb->m_send_lock);
2010  link_thread_send_buffers(sb, node);
2011  unlock(&sb->m_send_lock);
2012  }
2013 
2014  dst->m_buffers[wi] = src->m_first_page;
2015  wmb();
2016  dst->m_write_index = next;
2017 
2018  src->m_first_page = 0;
2019  src->m_last_page = 0;
2020 }
2021 
2026 bool
2028 {
2029  struct thr_repository *rep = &g_thr_repository;
2030  struct thr_data *selfptr = m_selfptr;
2031  struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
2032 
2033  do
2034  {
2035  sb->m_force_send = 0;
2036  lock(&sb->m_send_lock);
2037  sb->m_send_thread = selfptr->m_thr_no;
2038  globalTransporterRegistry.performSend(nodeId);
2039  sb->m_send_thread = NO_SEND_THREAD;
2040  unlock(&sb->m_send_lock);
2041  } while (sb->m_force_send);
2042 
2043  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2044 
2045  return true;
2046 }
2047 
2051 static
2052 void
2053 try_send(thr_data * selfptr, Uint32 node)
2054 {
2055  struct thr_repository *rep = &g_thr_repository;
2056  struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2057 
2058  do
2059  {
2060  if (trylock(&sb->m_send_lock) != 0)
2061  {
2062  return;
2063  }
2064 
2065  sb->m_force_send = 0;
2066  mb();
2067 
2068  sb->m_send_thread = selfptr->m_thr_no;
2069  globalTransporterRegistry.performSend(node);
2070  sb->m_send_thread = NO_SEND_THREAD;
2071  unlock(&sb->m_send_lock);
2072  } while (sb->m_force_send);
2073 
2074  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2075 }
2076 
2085 static
2086 void
2087 do_flush(struct thr_data* selfptr)
2088 {
2089  Uint32 i;
2090  Uint32 count = selfptr->m_pending_send_count;
2091  Uint8 *nodes = selfptr->m_pending_send_nodes;
2092 
2093  for (i = 0; i < count; i++)
2094  {
2095  flush_send_buffer(selfptr, nodes[i]);
2096  }
2097 }
2098 
2113 static
2114 Uint32
2115 do_send(struct thr_data* selfptr, bool must_send)
2116 {
2117  Uint32 i;
2118  Uint32 count = selfptr->m_pending_send_count;
2119  Uint8 *nodes = selfptr->m_pending_send_nodes;
2120  struct thr_repository* rep = &g_thr_repository;
2121 
2122  if (count == 0)
2123  {
2124  return 0; // send-buffers empty
2125  }
2126 
2127  /* Clear the pending list. */
2128  selfptr->m_pending_send_mask.clear();
2129  selfptr->m_pending_send_count = 0;
2130 
2131  for (i = 0; i < count; i++)
2132  {
2133  Uint32 node = nodes[i];
2134  selfptr->m_watchdog_counter = 6;
2135 
2136  flush_send_buffer(selfptr, node);
2137 
2138  thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2139 
2150  if (must_send)
2151  {
2152  sb->m_force_send = 1;
2153  }
2154 
2155  do
2156  {
2157  if (trylock(&sb->m_send_lock) != 0)
2158  {
2159  if (!must_send)
2160  {
2167  register_pending_send(selfptr, node);
2168  }
2169  else
2170  {
2171  /* Other thread will send for us as we set m_force_send. */
2172  }
2173  break;
2174  }
2175 
2186  sb->m_force_send = 0;
2187  mb();
2188 
2193  sb->m_send_thread = selfptr->m_thr_no;
2194  int res = globalTransporterRegistry.performSend(node);
2195  sb->m_send_thread = NO_SEND_THREAD;
2196  unlock(&sb->m_send_lock);
2197  if (res)
2198  {
2199  register_pending_send(selfptr, node);
2200  }
2201  } while (sb->m_force_send);
2202  }
2203 
2204  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2205 
2206  return selfptr->m_pending_send_count;
2207 }
2208 
2209 Uint32 *
2210 mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
2211 {
2212  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2213  thr_send_page * p = b->m_last_page;
2214  if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
2215  {
2216  return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
2217  }
2218  else if (p != 0)
2219  {
2220  // TODO: maybe dont always flush on page-boundary ???
2221  flush_send_buffer(m_selfptr, node);
2222  try_send(m_selfptr, node);
2223  }
2224 
2225  if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
2226  RG_TRANSPORTER_BUFFERS)) != 0)
2227  {
2228  p->m_bytes = 0;
2229  p->m_start = 0;
2230  p->m_next = 0;
2231  b->m_first_page = b->m_last_page = p;
2232  return (Uint32*)p->m_data;
2233  }
2234  return 0;
2235 }
2236 
2237 Uint32
2238 mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2239 {
2240  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2241  thr_send_page * p = b->m_last_page;
2242  p->m_bytes += lenBytes;
2243  return p->m_bytes;
2244 }
2245 
2246 /*
2247  * Insert a signal in a job queue.
2248  *
2249  * The signal is not visible to consumers yet after return from this function,
2250  * only recorded in the thr_jb_write_state. It is necessary to first call
2251  * flush_write_state() for this.
2252  *
2253  * The new_buffer is a job buffer to use if the current one gets full. If used,
2254  * we return true, indicating that the caller should allocate a new one for
2255  * the next call. (This is done to allow to insert under lock, but do the
2256  * allocation outside the lock).
2257  */
2258 static inline
2259 bool
2260 insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
2261  const SignalHeader* sh, const Uint32 *data,
2262  const Uint32 secPtr[3], thr_job_buffer *new_buffer)
2263 {
2264  Uint32 write_pos = w->m_write_pos;
2265  Uint32 datalen = sh->theLength;
2266  assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
2267  memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
2268  write_pos += (sizeof(*sh) >> 2);
2269  memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
2270  write_pos += datalen;
2271  const Uint32 *p= secPtr;
2272  for (Uint32 i = 0; i < sh->m_noOfSections; i++)
2273  w->m_write_buffer->m_data[write_pos++] = *p++;
2274  w->m_pending_signals++;
2275 
2276 #if SIZEOF_CHARP == 8
2277  /* Align to 8-byte boundary, to ensure aligned copies. */
2278  write_pos= (write_pos+1) & ~((Uint32)1);
2279 #endif
2280 
2281  /*
2282  * We make sure that there is always room for at least one signal in the
2283  * current buffer in the queue, so one insert is always possible without
2284  * adding a new buffer.
2285  */
2286  if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
2287  {
2288  w->m_write_pos = write_pos;
2289  return false;
2290  }
2291  else
2292  {
2293  /*
2294  * Need a write memory barrier here, as this might make signal data visible
2295  * to other threads.
2296  *
2297  * ToDo: We actually only need the wmb() here if we already make this
2298  * buffer visible to the other thread. So we might optimize it a bit. But
2299  * wmb() is a no-op on x86 anyway...
2300  */
2301  wmb();
2302  w->m_write_buffer->m_len = write_pos;
2303  Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
2304 
2313  if (unlikely(write_index == q->m_head->m_read_index))
2314  {
2315  job_buffer_full(0);
2316  }
2317  new_buffer->m_len = 0;
2318  new_buffer->m_prioa = prioa;
2319  q->m_buffers[write_index] = new_buffer;
2320  w->m_write_index = write_index;
2321  w->m_write_pos = 0;
2322  w->m_write_buffer = new_buffer;
2323  return true; // Buffer new_buffer used
2324  }
2325 
2326  return false; // Buffer new_buffer not used
2327 }
2328 
2329 static
2330 void
2331 read_jbb_state(thr_data *selfptr, Uint32 count)
2332 {
2333 
2334  thr_jb_read_state *r = selfptr->m_read_states;
2335  const thr_job_queue *q = selfptr->m_in_queue;
2336  for (Uint32 i = 0; i < count; i++,r++,q++)
2337  {
2338  Uint32 read_index = r->m_read_index;
2339 
2344  if (r->m_write_index == read_index)
2345  {
2346  r->m_write_index = q->m_head->m_write_index;
2347  read_barrier_depends();
2348  r->m_read_end = q->m_buffers[read_index]->m_len;
2349  }
2350  }
2351 }
2352 
2353 static
2354 bool
2355 read_jba_state(thr_data *selfptr)
2356 {
2357  thr_jb_read_state *r = &(selfptr->m_jba_read_state);
2358  r->m_write_index = selfptr->m_jba_head.m_write_index;
2359  read_barrier_depends();
2360  r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
2361  return r->is_empty();
2362 }
2363 
2364 /* Check all job queues, return true only if all are empty. */
2365 static bool
2366 check_queues_empty(thr_data *selfptr)
2367 {
2368  Uint32 thr_count = g_thr_repository.m_thread_count;
2369  bool empty = read_jba_state(selfptr);
2370  if (!empty)
2371  return false;
2372 
2373  read_jbb_state(selfptr, thr_count);
2374  const thr_jb_read_state *r = selfptr->m_read_states;
2375  for (Uint32 i = 0; i < thr_count; i++,r++)
2376  {
2377  if (!r->is_empty())
2378  return false;
2379  }
2380  return true;
2381 }
2382 
2383 /*
2384  * Execute at most MAX_SIGNALS signals from one job queue, updating local read
2385  * state as appropriate.
2386  *
2387  * Returns number of signals actually executed.
2388  */
2389 static
2390 Uint32
2391 execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
2392  Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
2393 {
2394  Uint32 num_signals;
2395  Uint32 read_index = r->m_read_index;
2396  Uint32 write_index = r->m_write_index;
2397  Uint32 read_pos = r->m_read_pos;
2398  Uint32 read_end = r->m_read_end;
2399  Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
2400 
2401  if (read_index == write_index && read_pos >= read_end)
2402  return 0; // empty read_state
2403 
2404  thr_job_buffer *read_buffer = r->m_read_buffer;
2405 
2406  for (num_signals = 0; num_signals < max_signals; num_signals++)
2407  {
2408  while (read_pos >= read_end)
2409  {
2410  if (read_index == write_index)
2411  {
2412  /* No more available now. */
2413  return num_signals;
2414  }
2415  else
2416  {
2417  /* Move to next buffer. */
2418  read_index = (read_index + 1) % thr_job_queue::SIZE;
2419  release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
2420  read_buffer = q->m_buffers[read_index];
2421  read_pos = 0;
2422  read_end = read_buffer->m_len;
2423  /* Update thread-local read state. */
2424  r->m_read_index = q->m_head->m_read_index = read_index;
2425  r->m_read_buffer = read_buffer;
2426  r->m_read_pos = read_pos;
2427  r->m_read_end = read_end;
2428  }
2429  }
2430 
2431  /*
2432  * These pre-fetching were found using OProfile to reduce cache misses.
2433  * (Though on Intel Core 2, they do not give much speedup, as apparently
2434  * the hardware prefetcher is already doing a fairly good job).
2435  */
2436  NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
2437  NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
2438 
2439  /* Now execute the signal. */
2440  SignalHeader* s =
2441  reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
2442  Uint32 seccnt = s->m_noOfSections;
2443  Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
2444  if(siglen>16)
2445  {
2446  NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
2447  }
2448  Uint32 bno = blockToMain(s->theReceiversBlockNumber);
2449  Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
2450  SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
2451  assert(block != 0);
2452 
2453  Uint32 gsn = s->theVerId_signalNumber;
2454  *watchDogCounter = 1;
2455  /* Must update original buffer so signal dump will see it. */
2456  s->theSignalId = (*signalIdCounter)++;
2457  memcpy(&sig->header, s, 4*siglen);
2458  sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
2459  sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
2460  sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
2461 
2462  read_pos += siglen + seccnt;
2463 #if SIZEOF_CHARP == 8
2464  /* Handle 8-byte alignment. */
2465  read_pos = (read_pos + 1) & ~((Uint32)1);
2466 #endif
2467 
2468  /* Update just before execute so signal dump can know how far we are. */
2469  r->m_read_pos = read_pos;
2470 
2471 #ifdef VM_TRACE
2472  if (globalData.testOn)
2473  { //wl4391_todo segments
2474  SegmentedSectionPtr ptr[3];
2475  ptr[0].i = sig->m_sectionPtrI[0];
2476  ptr[1].i = sig->m_sectionPtrI[1];
2477  ptr[2].i = sig->m_sectionPtrI[2];
2478  ::getSections(seccnt, ptr);
2479  globalSignalLoggers.executeSignal(*s,
2480  0,
2481  &sig->theData[0],
2482  globalData.ownId,
2483  ptr, seccnt);
2484  }
2485 #endif
2486 
2487  block->executeFunction(gsn, sig);
2488  }
2489 
2490  return num_signals;
2491 }
2492 
2493 static
2494 Uint32
2495 run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
2496 {
2497  Uint32 thr_count = g_thr_repository.m_thread_count;
2498  Uint32 signal_count = 0;
2499  Uint32 perjb = selfptr->m_max_signals_per_jb;
2500 
2501  read_jbb_state(selfptr, thr_count);
2502  /*
2503  * A load memory barrier to ensure that we see any prio A signal sent later
2504  * than loaded prio B signals.
2505  */
2506  rmb();
2507 
2508  thr_job_queue *queue = selfptr->m_in_queue;
2509  thr_jb_read_state *read_state = selfptr->m_read_states;
2510  for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
2511  send_thr_no++,queue++,read_state++)
2512  {
2513  /* Read the prio A state often, to avoid starvation of prio A. */
2514  bool jba_empty = read_jba_state(selfptr);
2515  if (!jba_empty)
2516  {
2517  static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
2518  signal_count += execute_signals(selfptr, &(selfptr->m_jba),
2519  &(selfptr->m_jba_read_state), sig,
2520  max_prioA, signalIdCounter);
2521  }
2522 
2523  /* Now execute prio B signals from one thread. */
2524  signal_count += execute_signals(selfptr, queue, read_state,
2525  sig, perjb, signalIdCounter);
2526  }
2527 
2528  return signal_count;
2529 }
2530 
2532  enum { NULL_THR_NO = 0xFF };
2533  Uint8 thr_no;
2534  thr_map_entry() : thr_no(NULL_THR_NO) {}
2535 };
2536 
2537 static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
2538 
2539 static inline Uint32
2540 block2ThreadId(Uint32 block, Uint32 instance)
2541 {
2542  assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
2543  Uint32 index = block - MIN_BLOCK_NO;
2544  assert(instance < MAX_BLOCK_INSTANCES);
2545  const thr_map_entry& entry = thr_map[index][instance];
2546  assert(entry.thr_no < num_threads);
2547  return entry.thr_no;
2548 }
2549 
2550 void
2551 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
2552 {
2553  assert(main == blockToMain(main));
2554  Uint32 index = main - MIN_BLOCK_NO;
2555  assert(index < NO_OF_BLOCKS);
2556  assert(instance < MAX_BLOCK_INSTANCES);
2557 
2558  SimulatedBlock* b = globalData.getBlock(main, instance);
2559  require(b != 0);
2560 
2561  /* Block number including instance. */
2562  Uint32 block = numberToBlock(main, instance);
2563 
2564  require(thr_no < num_threads);
2565  struct thr_repository* rep = &g_thr_repository;
2566  thr_data* thr_ptr = rep->m_thread + thr_no;
2567 
2568  /* Add to list. */
2569  {
2570  Uint32 i;
2571  for (i = 0; i < thr_ptr->m_instance_count; i++)
2572  require(thr_ptr->m_instance_list[i] != block);
2573  }
2574  require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
2575  thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
2576 
2578  ctx.threadId = thr_no;
2579  ctx.jamBuffer = &thr_ptr->m_jam;
2580  ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
2581  ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
2582  b->assignToThread(ctx);
2583 
2584  /* Create entry mapping block to thread. */
2585  thr_map_entry& entry = thr_map[index][instance];
2586  require(entry.thr_no == thr_map_entry::NULL_THR_NO);
2587  entry.thr_no = thr_no;
2588 }
2589 
2590 /* Static assignment of main instances (before first signal). */
2591 void
2592 add_main_thr_map()
2593 {
2594  /* Keep mt-classic assignments in MT LQH. */
2595  const Uint32 thr_GLOBAL = 0;
2596  const Uint32 thr_LOCAL = 1;
2597  const Uint32 thr_RECEIVER = receiver_thread_no;
2598 
2599  add_thr_map(BACKUP, 0, thr_LOCAL);
2600  add_thr_map(DBTC, 0, thr_GLOBAL);
2601  add_thr_map(DBDIH, 0, thr_GLOBAL);
2602  add_thr_map(DBLQH, 0, thr_LOCAL);
2603  add_thr_map(DBACC, 0, thr_LOCAL);
2604  add_thr_map(DBTUP, 0, thr_LOCAL);
2605  add_thr_map(DBDICT, 0, thr_GLOBAL);
2606  add_thr_map(NDBCNTR, 0, thr_GLOBAL);
2607  add_thr_map(QMGR, 0, thr_GLOBAL);
2608  add_thr_map(NDBFS, 0, thr_GLOBAL);
2609  add_thr_map(CMVMI, 0, thr_RECEIVER);
2610  add_thr_map(TRIX, 0, thr_GLOBAL);
2611  add_thr_map(DBUTIL, 0, thr_GLOBAL);
2612  add_thr_map(SUMA, 0, thr_LOCAL);
2613  add_thr_map(DBTUX, 0, thr_LOCAL);
2614  add_thr_map(TSMAN, 0, thr_LOCAL);
2615  add_thr_map(LGMAN, 0, thr_LOCAL);
2616  add_thr_map(PGMAN, 0, thr_LOCAL);
2617  add_thr_map(RESTORE, 0, thr_LOCAL);
2618  add_thr_map(DBINFO, 0, thr_LOCAL);
2619  add_thr_map(DBSPJ, 0, thr_GLOBAL);
2620 }
2621 
2622 /* Workers added by LocalProxy (before first signal). */
2623 void
2624 add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
2625 {
2626  require(instance != 0);
2627  Uint32 i = instance - 1;
2628  Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
2629  add_thr_map(block, instance, thr_no);
2630 }
2631 
2632 /* Extra workers run`in proxy thread. */
2633 void
2634 add_extra_worker_thr_map(Uint32 block, Uint32 instance)
2635 {
2636  require(instance != 0);
2637  Uint32 thr_no = block2ThreadId(block, 0);
2638  add_thr_map(block, instance, thr_no);
2639 }
2640 
2651 void
2652 finalize_thr_map()
2653 {
2654  for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
2655  {
2656  Uint32 bno = b + MIN_BLOCK_NO;
2657  Uint32 cnt = 0;
2658  while (cnt < MAX_BLOCK_INSTANCES &&
2659  thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
2660  cnt++;
2661 
2662  if (cnt != MAX_BLOCK_INSTANCES)
2663  {
2664  SimulatedBlock * main = globalData.getBlock(bno, 0);
2665  for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
2666  {
2667  Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
2668  if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
2669  {
2670  thr_map[b][i] = thr_map[b][dup];
2671  main->addInstance(globalData.getBlock(bno, dup), i);
2672  }
2673  else
2674  {
2678  require(bno == PGMAN);
2679  }
2680  }
2681  }
2682  }
2683 }
2684 
2685 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
2686  Uint32 b_count, Uint32 b_size)
2687 {
2688  SignalT<6> sT;
2689  Signal *s= new (&sT) Signal(0);
2690 
2691  memset(&s->header, 0, sizeof(s->header));
2692  s->header.theLength = 6;
2693  s->header.theSendersSignalId = 0;
2694  s->header.theSendersBlockRef = numberToRef(0, 0);
2695  s->header.theVerId_signalNumber = GSN_EVENT_REP;
2696  s->header.theReceiversBlockNumber = CMVMI;
2697  s->theData[0] = NDB_LE_MTSignalStatistics;
2698  s->theData[1] = self;
2699  s->theData[2] = a_count;
2700  s->theData[3] = a_size;
2701  s->theData[4] = b_count;
2702  s->theData[5] = b_size;
2703  /* ToDo: need this really be prio A like in old code? */
2704  sendlocal(self, &s->header, s->theData,
2705  NULL);
2706 }
2707 
2708 static inline void
2709 update_sched_stats(thr_data *selfptr)
2710 {
2711  if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
2712  {
2713  reportSignalStats(selfptr->m_thr_no,
2714  selfptr->m_prioa_count,
2715  selfptr->m_prioa_size,
2716  selfptr->m_priob_count,
2717  selfptr->m_priob_size);
2718  selfptr->m_prioa_count = 0;
2719  selfptr->m_prioa_size = 0;
2720  selfptr->m_priob_count = 0;
2721  selfptr->m_priob_size = 0;
2722 
2723 #if 0
2724  Uint32 thr_no = selfptr->m_thr_no;
2725  ndbout_c("--- %u fifo: %u jba: %u global: %u",
2726  thr_no,
2727  fifo_used_pages(selfptr),
2728  selfptr->m_jba_head.used(),
2729  g_thr_repository.m_free_list.m_cnt);
2730  for (Uint32 i = 0; i<num_threads; i++)
2731  {
2732  ndbout_c(" %u-%u : %u",
2733  thr_no, i, selfptr->m_in_queue_head[i].used());
2734  }
2735 #endif
2736  }
2737 }
2738 
2739 static void
2740 init_thread(thr_data *selfptr)
2741 {
2742  selfptr->m_waiter.init();
2743  selfptr->m_jam.theEmulatedJamIndex = 0;
2744  selfptr->m_jam.theEmulatedJamBlockNumber = 0;
2745  bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
2746  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
2747  NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
2748 
2749  unsigned thr_no = selfptr->m_thr_no;
2750  globalEmulatorData.theWatchDog->
2751  registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
2752  {
2753  while(selfptr->m_thread == 0)
2754  NdbSleep_MilliSleep(30);
2755  }
2756 
2757  THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2758  BaseString tmp;
2759  tmp.appfmt("thr: %u ", thr_no);
2760 
2761  int tid = NdbThread_GetTid(selfptr->m_thread);
2762  if (tid != -1)
2763  {
2764  tmp.appfmt("tid: %u ", tid);
2765  }
2766 
2767  conf.appendInfo(tmp,
2768  selfptr->m_instance_list, selfptr->m_instance_count);
2769  int res = conf.do_bind(selfptr->m_thread,
2770  selfptr->m_instance_list, selfptr->m_instance_count);
2771  if (res < 0)
2772  {
2773  tmp.appfmt("err: %d ", -res);
2774  }
2775  else if (res > 0)
2776  {
2777  tmp.appfmt("OK ");
2778  }
2779 
2780  selfptr->m_thr_id = pthread_self();
2781 
2782  for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
2783  {
2784  BlockReference block = selfptr->m_instance_list[i];
2785  Uint32 main = blockToMain(block);
2786  Uint32 instance = blockToInstance(block);
2787  tmp.appfmt("%s(%u) ", getBlockName(main), instance);
2788  }
2789  printf("%s\n", tmp.c_str());
2790  fflush(stdout);
2791 }
2792 
2797 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
2798 static Signal *
2799 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
2800 {
2801  UintPtr sigtmp= (UintPtr)signal_buf;
2802  sigtmp= (sigtmp+63) & (~(UintPtr)63);
2803  sigtmp+= thr_no*256;
2804  return (Signal *)sigtmp;
2805 }
2806 
2807 Uint32 receiverThreadId;
2808 
2809 /*
2810  * We only do receive in thread 2, no other threads do receive.
2811  *
2812  * As part of the receive loop, we also periodically call update_connections()
2813  * (this way we are similar to single-threaded ndbd).
2814  *
2815  * The CMVMI block (and no other blocks) run in the same thread as this
2816  * receive loop; this way we avoid races between update_connections() and
2817  * CMVMI calls into the transporters.
2818  *
2819  * Note that with this setup, local signals to CMVMI cannot wake up the thread
2820  * if it is sleeping on the receive sockets. Thus CMVMI local signal processing
2821  * can be (slightly) delayed, however CMVMI is not really performance critical
2822  * (hopefully).
2823  */
2824 extern "C"
2825 void *
2826 mt_receiver_thread_main(void *thr_arg)
2827 {
2828  unsigned char signal_buf[SIGBUF_SIZE];
2829  Signal *signal;
2830  struct thr_repository* rep = &g_thr_repository;
2831  struct thr_data* selfptr = (struct thr_data *)thr_arg;
2832  unsigned thr_no = selfptr->m_thr_no;
2833  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
2834  Uint32 thrSignalId = 0;
2835  bool has_received = false;
2836 
2837  init_thread(selfptr);
2838  receiverThreadId = thr_no;
2839  signal = aligned_signal(signal_buf, thr_no);
2840 
2841  while (globalData.theRestartFlag != perform_stop)
2842  {
2843  static int cnt = 0;
2844 
2845  update_sched_stats(selfptr);
2846 
2847  if (cnt == 0)
2848  {
2849  watchDogCounter = 5;
2850  globalTransporterRegistry.update_connections();
2851  }
2852  cnt = (cnt + 1) & 15;
2853 
2854  watchDogCounter = 2;
2855 
2856  NDB_TICKS now = NdbTick_CurrentMillisecond();
2857  scan_time_queues(selfptr, now);
2858 
2859  Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
2860 
2861  if (sum || has_received)
2862  {
2863  watchDogCounter = 6;
2864  flush_jbb_write_state(selfptr);
2865  }
2866 
2867  do_send(selfptr, TRUE);
2868 
2869  watchDogCounter = 7;
2870 
2871  has_received = false;
2872  if (globalTransporterRegistry.pollReceive(1))
2873  {
2874  if (check_job_buffers(rep) == 0)
2875  {
2876  watchDogCounter = 8;
2877  lock(&rep->m_receive_lock);
2878  globalTransporterRegistry.performReceive();
2879  unlock(&rep->m_receive_lock);
2880  has_received = true;
2881  }
2882  }
2883  }
2884 
2885  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2886  return NULL; // Return value not currently used
2887 }
2888 
2889 static
2890 inline
2891 void
2892 sendpacked(struct thr_data* thr_ptr, Signal* signal)
2893 {
2894  Uint32 i;
2895  for (i = 0; i < thr_ptr->m_instance_count; i++)
2896  {
2897  BlockReference block = thr_ptr->m_instance_list[i];
2898  Uint32 main = blockToMain(block);
2899  Uint32 instance = blockToInstance(block);
2900  SimulatedBlock* b = globalData.getBlock(main, instance);
2901  // wl4391_todo remove useless assert
2902  assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
2903  /* b->send_at_job_buffer_end(); */
2904  b->executeFunction(GSN_SEND_PACKED, signal);
2905  }
2906 }
2907 
2912 static bool
2913 check_job_buffer_full(thr_data *selfptr)
2914 {
2915  Uint32 thr_no = selfptr->m_thr_no;
2916  Uint32 tmp = compute_max_signals_to_execute(thr_no);
2917 #if 0
2918  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2919 
2920  if (perjb == 0)
2921  {
2922  return true;
2923  }
2924 
2925  return false;
2926 #else
2927  if (tmp < g_thr_repository.m_thread_count)
2928  return true;
2929  return false;
2930 #endif
2931 }
2932 
2955 static
2956 bool
2957 update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
2958 {
2959  Uint32 sleeploop = 0;
2960  Uint32 thr_no = selfptr->m_thr_no;
2961 loop:
2962  Uint32 tmp = compute_max_signals_to_execute(thr_no);
2963  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2964 
2965  if (perjb > MAX_SIGNALS_PER_JB)
2966  perjb = MAX_SIGNALS_PER_JB;
2967 
2968  selfptr->m_max_exec_signals = tmp;
2969  selfptr->m_max_signals_per_jb = perjb;
2970 
2971  if (unlikely(perjb == 0))
2972  {
2973  sleeploop++;
2974  if (sleeploop == 10)
2975  {
2979  selfptr->m_max_signals_per_jb = 1;
2980  ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
2981  return true;
2982  }
2983 
2984  if (pending_send)
2985  {
2986  /* About to sleep, _must_ send now. */
2987  pending_send = do_send(selfptr, TRUE);
2988  }
2989 
2990  const Uint32 wait = 1000000; /* 1 ms */
2991  yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
2992  goto loop;
2993  }
2994 
2995  return sleeploop > 0;
2996 }
2997 
2998 extern "C"
2999 void *
3000 mt_job_thread_main(void *thr_arg)
3001 {
3002  unsigned char signal_buf[SIGBUF_SIZE];
3003  Signal *signal;
3004  const Uint32 nowait = 10 * 1000000; /* 10 ms */
3005  Uint32 thrSignalId = 0;
3006 
3007  struct thr_data* selfptr = (struct thr_data *)thr_arg;
3008  init_thread(selfptr);
3009  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
3010 
3011  unsigned thr_no = selfptr->m_thr_no;
3012  signal = aligned_signal(signal_buf, thr_no);
3013 
3014  /* Avoid false watchdog alarms caused by race condition. */
3015  watchDogCounter = 1;
3016 
3017  Uint32 pending_send = 0;
3018  Uint32 send_sum = 0;
3019  int loops = 0;
3020  int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
3021  NDB_TICKS now = selfptr->m_time;
3022 
3023  while (globalData.theRestartFlag != perform_stop)
3024  {
3025  loops++;
3026  update_sched_stats(selfptr);
3027 
3028  watchDogCounter = 2;
3029  scan_time_queues(selfptr, now);
3030 
3031  Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
3032 
3033  watchDogCounter = 1;
3034  signal->header.m_noOfSections = 0; /* valgrind */
3035  sendpacked(selfptr, signal);
3036 
3037  if (sum)
3038  {
3039  watchDogCounter = 6;
3040  flush_jbb_write_state(selfptr);
3041  send_sum += sum;
3042 
3043  if (send_sum > MAX_SIGNALS_BEFORE_SEND)
3044  {
3045  /* Try to send, but skip for now in case of lock contention. */
3046  pending_send = do_send(selfptr, FALSE);
3047  send_sum = 0;
3048  }
3049  else
3050  {
3051  /* Send buffers append to send queues to dst. nodes. */
3052  do_flush(selfptr);
3053  }
3054  }
3055  else
3056  {
3057  /* No signals processed, prepare to sleep to wait for more */
3058  if (pending_send || send_sum > 0)
3059  {
3060  /* About to sleep, _must_ send now. */
3061  pending_send = do_send(selfptr, TRUE);
3062  send_sum = 0;
3063  }
3064 
3065  if (pending_send == 0)
3066  {
3067  bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
3068  selfptr);
3069  if (waited)
3070  {
3071  /* Update current time after sleeping */
3072  now = NdbTick_CurrentMillisecond();
3073  loops = 0;
3074  }
3075  }
3076  }
3077 
3082  if (sum >= selfptr->m_max_exec_signals)
3083  {
3084  if (update_sched_config(selfptr, pending_send))
3085  {
3086  /* Update current time after sleeping */
3087  now = NdbTick_CurrentMillisecond();
3088  loops = 0;
3089  }
3090  }
3091  else
3092  {
3093  selfptr->m_max_exec_signals -= sum;
3094  }
3095 
3100  if (loops > maxloops)
3101  {
3102  now = NdbTick_CurrentMillisecond();
3103  Uint64 diff = now - selfptr->m_time;
3104 
3105  /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
3106  if (diff < 1)
3107  maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
3108  else if (diff > 1 && maxloops > 1)
3109  maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
3110 
3111  loops = 0;
3112  }
3113  }
3114 
3115  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
3116  return NULL; // Return value not currently used
3117 }
3118 
3119 void
3120 sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
3121  const Uint32 secPtr[3])
3122 {
3123  Uint32 block = blockToMain(s->theReceiversBlockNumber);
3124  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3125 
3126  /*
3127  * Max number of signals to put into job buffer before flushing the buffer
3128  * to the other thread.
3129  * This parameter found to be reasonable by benchmarking.
3130  */
3131  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ?
3132  MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
3133  MAX_SIGNALS_BEFORE_FLUSH_OTHER;
3134 
3135  Uint32 dst = block2ThreadId(block, instance);
3136  struct thr_repository* rep = &g_thr_repository;
3137  struct thr_data * selfptr = rep->m_thread + self;
3138  assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
3139  struct thr_data * dstptr = rep->m_thread + dst;
3140 
3141  selfptr->m_priob_count++;
3142  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3143  selfptr->m_priob_size += siglen;
3144 
3145  thr_job_queue *q = dstptr->m_in_queue + self;
3146  thr_jb_write_state *w = selfptr->m_write_states + dst;
3147  if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
3148  {
3149  selfptr->m_next_buffer = seize_buffer(rep, self, false);
3150  }
3151  if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
3152  flush_write_state(selfptr, dstptr, q->m_head, w);
3153 }
3154 
3155 void
3156 sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
3157  const Uint32 secPtr[3])
3158 {
3159  Uint32 block = blockToMain(s->theReceiversBlockNumber);
3160  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3161 
3162  Uint32 dst = block2ThreadId(block, instance);
3163  struct thr_repository* rep = &g_thr_repository;
3164  struct thr_data *selfptr = rep->m_thread + self;
3165  assert(s->theVerId_signalNumber == GSN_START_ORD ||
3166  pthread_equal(selfptr->m_thr_id, pthread_self()));
3167  struct thr_data *dstptr = rep->m_thread + dst;
3168 
3169  selfptr->m_prioa_count++;
3170  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3171  selfptr->m_prioa_size += siglen;
3172 
3173  thr_job_queue *q = &(dstptr->m_jba);
3175 
3176  lock(&dstptr->m_jba_write_lock);
3177 
3178  Uint32 index = q->m_head->m_write_index;
3179  w.m_write_index = index;
3180  thr_job_buffer *buffer = q->m_buffers[index];
3181  w.m_write_buffer = buffer;
3182  w.m_write_pos = buffer->m_len;
3183  w.m_pending_signals = 0;
3184  w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3185  bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
3186  selfptr->m_next_buffer);
3187  flush_write_state(selfptr, dstptr, q->m_head, &w);
3188 
3189  unlock(&dstptr->m_jba_write_lock);
3190 
3191  if (buf_used)
3192  selfptr->m_next_buffer = seize_buffer(rep, self, true);
3193 }
3194 
3200 SendStatus
3201 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3202  const Uint32 * data, NodeId nodeId,
3203  const LinearSectionPtr ptr[3])
3204 {
3205  thr_repository *rep = &g_thr_repository;
3206  thr_data *selfptr = rep->m_thread + self;
3207  SendStatus ss;
3208 
3209  mt_send_handle handle(selfptr);
3210  register_pending_send(selfptr, nodeId);
3211  /* prepareSend() is lock-free, as we have per-thread send buffers. */
3212  ss = globalTransporterRegistry.prepareSend(&handle,
3213  sh, prio, data, nodeId, ptr);
3214  return ss;
3215 }
3216 
3217 SendStatus
3218 mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3219  const Uint32 *data, NodeId nodeId,
3220  class SectionSegmentPool *thePool,
3221  const SegmentedSectionPtr ptr[3])
3222 {
3223  thr_repository *rep = &g_thr_repository;
3224  thr_data *selfptr = rep->m_thread + self;
3225  SendStatus ss;
3226 
3227  mt_send_handle handle(selfptr);
3228  register_pending_send(selfptr, nodeId);
3229  ss = globalTransporterRegistry.prepareSend(&handle,
3230  sh, prio, data, nodeId,
3231  *thePool, ptr);
3232  return ss;
3233 }
3234 
3235 /*
3236  * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
3237  *
3238  * It works when called from any other thread, not just from job processing
3239  * threads. But note that this signal will be the last signal to be executed by
3240  * the other thread, as it will exit immediately.
3241  */
3242 static
3243 void
3244 sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
3245 {
3247  struct thr_repository* rep = &g_thr_repository;
3248  /* As this signal will be the last one executed by the other thread, it does
3249  not matter which buffer we use in case the current buffer is filled up by
3250  the STOP_FOR_CRASH signal; the data in it will never be read.
3251  */
3252  static thr_job_buffer dummy_buffer;
3253 
3257  struct thr_data * dstptr = rep->m_thread + dst;
3258  Uint32 bno = dstptr->m_instance_list[0];
3259 
3260  memset(&signalT.header, 0, sizeof(SignalHeader));
3261  signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
3262  signalT.header.theReceiversBlockNumber = bno;
3263  signalT.header.theSendersBlockRef = 0;
3264  signalT.header.theTrace = 0;
3265  signalT.header.theSendersSignalId = 0;
3266  signalT.header.theSignalId = 0;
3267  signalT.header.theLength = StopForCrash::SignalLength;
3268  StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
3269  stopForCrash->flags = 0;
3270 
3271  thr_job_queue *q = &(dstptr->m_jba);
3273 
3274  lock(&dstptr->m_jba_write_lock);
3275 
3276  Uint32 index = q->m_head->m_write_index;
3277  w.m_write_index = index;
3278  thr_job_buffer *buffer = q->m_buffers[index];
3279  w.m_write_buffer = buffer;
3280  w.m_write_pos = buffer->m_len;
3281  w.m_pending_signals = 0;
3282  w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3283  insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
3284  &dummy_buffer);
3285  flush_write_state(selfptr, dstptr, q->m_head, &w);
3286 
3287  unlock(&dstptr->m_jba_write_lock);
3288 }
3289 
3293 static
3294 void
3295 queue_init(struct thr_tq* tq)
3296 {
3297  tq->m_next_timer = 0;
3298  tq->m_current_time = 0;
3299  tq->m_next_free = RNIL;
3300  tq->m_cnt[0] = tq->m_cnt[1] = 0;
3301  bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
3302 }
3303 
3304 static
3305 void
3306 thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
3307  unsigned thr_no)
3308 {
3309  Uint32 i;
3310 
3311  selfptr->m_thr_no = thr_no;
3312  selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
3313  selfptr->m_max_exec_signals = 0;
3314  selfptr->m_first_free = 0;
3315  selfptr->m_first_unused = 0;
3316 
3317  {
3318  char buf[100];
3319  BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
3320  register_lock(&selfptr->m_jba_write_lock, buf);
3321  }
3322  selfptr->m_jba_head.m_read_index = 0;
3323  selfptr->m_jba_head.m_write_index = 0;
3324  selfptr->m_jba.m_head = &selfptr->m_jba_head;
3325  thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
3326  selfptr->m_jba.m_buffers[0] = buffer;
3327  selfptr->m_jba_read_state.m_read_index = 0;
3328  selfptr->m_jba_read_state.m_read_buffer = buffer;
3329  selfptr->m_jba_read_state.m_read_pos = 0;
3330  selfptr->m_jba_read_state.m_read_end = 0;
3331  selfptr->m_jba_read_state.m_write_index = 0;
3332  selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
3333  selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
3334 
3335  for (i = 0; i<cnt; i++)
3336  {
3337  selfptr->m_in_queue_head[i].m_read_index = 0;
3338  selfptr->m_in_queue_head[i].m_write_index = 0;
3339  selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
3340  buffer = seize_buffer(rep, thr_no, false);
3341  selfptr->m_in_queue[i].m_buffers[0] = buffer;
3342  selfptr->m_read_states[i].m_read_index = 0;
3343  selfptr->m_read_states[i].m_read_buffer = buffer;
3344  selfptr->m_read_states[i].m_read_pos = 0;
3345  selfptr->m_read_states[i].m_read_end = 0;
3346  selfptr->m_read_states[i].m_write_index = 0;
3347  }
3348  queue_init(&selfptr->m_tq);
3349 
3350  selfptr->m_prioa_count = 0;
3351  selfptr->m_prioa_size = 0;
3352  selfptr->m_priob_count = 0;
3353  selfptr->m_priob_size = 0;
3354 
3355  selfptr->m_pending_send_count = 0;
3356  selfptr->m_pending_send_mask.clear();
3357 
3358  selfptr->m_instance_count = 0;
3359  for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
3360  selfptr->m_instance_list[i] = 0;
3361 
3362  bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
3363 
3364  selfptr->m_thread = 0;
3365  selfptr->m_cpu = NO_LOCK_CPU;
3366 }
3367 
3368 /* Have to do this after init of all m_in_queues is done. */
3369 static
3370 void
3371 thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
3372  unsigned int cnt, unsigned thr_no)
3373 {
3374  for (Uint32 i = 0; i<cnt; i++)
3375  {
3376  selfptr->m_write_states[i].m_write_index = 0;
3377  selfptr->m_write_states[i].m_write_pos = 0;
3378  selfptr->m_write_states[i].m_write_buffer =
3379  rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
3380  selfptr->m_write_states[i].m_pending_signals = 0;
3381  selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
3382  }
3383 }
3384 
3385 static
3386 void
3387 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
3388 {
3389  char buf[100];
3390  BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
3391  register_lock(&sb->m_send_lock, buf);
3392  sb->m_force_send = 0;
3393  sb->m_send_thread = NO_SEND_THREAD;
3394  bzero(&sb->m_buffer, sizeof(sb->m_buffer));
3395  sb->m_bytes = 0;
3396  bzero(sb->m_read_index, sizeof(sb->m_read_index));
3397 }
3398 
3399 static
3400 void
3401 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
3402 {
3403  rep->m_mm = mm;
3404 
3405  rep->m_thread_count = cnt;
3406  for (unsigned int i = 0; i<cnt; i++)
3407  {
3408  thr_init(rep, rep->m_thread + i, cnt, i);
3409  }
3410  for (unsigned int i = 0; i<cnt; i++)
3411  {
3412  thr_init2(rep, rep->m_thread + i, cnt, i);
3413  }
3414 
3415  rep->stopped_threads = 0;
3416  NdbMutex_Init(&rep->stop_for_crash_mutex);
3417  NdbCondition_Init(&rep->stop_for_crash_cond);
3418 
3419  for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
3420  {
3421  send_buffer_init(i, rep->m_send_buffers+i);
3422  }
3423 
3424  bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
3425 }
3426 
3427 
3432 #include "ThreadConfig.hpp"
3433 #include <signaldata/StartOrd.hpp>
3434 
3435 Uint32
3436 compute_jb_pages(struct EmulatorData * ed)
3437 {
3438  Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
3439 
3440  Uint32 perthread = 0;
3441 
3446  perthread += cnt * (1 + thr_job_queue::SIZE);
3447 
3451  perthread += (1 + thr_job_queue::SIZE);
3452 
3456  perthread += 32; // Say 1M for now
3457 
3461  perthread += THR_FREE_BUF_MAX;
3462 
3466  Uint32 tot = cnt * perthread;
3467 
3468  return tot;
3469 }
3470 
3471 ThreadConfig::ThreadConfig()
3472 {
3473 }
3474 
3475 ThreadConfig::~ThreadConfig()
3476 {
3477 }
3478 
3479 /*
3480  * We must do the init here rather than in the constructor, since at
3481  * constructor time the global memory manager is not available.
3482  */
3483 void
3484 ThreadConfig::init()
3485 {
3486  num_lqh_workers = globalData.ndbMtLqhWorkers;
3487  num_lqh_threads = globalData.ndbMtLqhThreads;
3488  num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
3489  require(num_threads <= MAX_THREADS);
3490  receiver_thread_no = num_threads - 1;
3491 
3492  ndbout << "NDBMT: num_threads=" << num_threads << endl;
3493 
3494  ::rep_init(&g_thr_repository, num_threads,
3495  globalEmulatorData.m_mem_manager);
3496 }
3497 
3498 static
3499 void
3500 setcpuaffinity(struct thr_repository* rep)
3501 {
3502  THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
3503  conf.create_cpusets();
3504  if (conf.getInfoMessage())
3505  {
3506  printf("%s", conf.getInfoMessage());
3507  fflush(stdout);
3508  }
3509 }
3510 
3511 void
3512 ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
3513 {
3514  unsigned int thr_no;
3515  struct thr_repository* rep = &g_thr_repository;
3516 
3520  setcpuaffinity(rep);
3521 
3522  /*
3523  * Start threads for all execution threads, except for the receiver
3524  * thread, which runs in the main thread.
3525  */
3526  for (thr_no = 0; thr_no < num_threads; thr_no++)
3527  {
3528  rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
3529 
3530  if (thr_no == receiver_thread_no)
3531  continue; // Will run in the main thread.
3532 
3533  /*
3534  * The NdbThread_Create() takes void **, but that is cast to void * when
3535  * passed to the thread function. Which is kind of strange ...
3536  */
3537  rep->m_thread[thr_no].m_thread =
3538  NdbThread_Create(mt_job_thread_main,
3539  (void **)(rep->m_thread + thr_no),
3540  1024*1024,
3541  "execute thread", //ToDo add number
3542  NDB_THREAD_PRIO_MEAN);
3543  require(rep->m_thread[thr_no].m_thread != NULL);
3544  }
3545 
3546  /* Now run the main loop for thread 0 directly. */
3547  rep->m_thread[receiver_thread_no].m_thread = pThis;
3548  mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
3549 
3550  /* Wait for all threads to shutdown. */
3551  for (thr_no = 0; thr_no < num_threads; thr_no++)
3552  {
3553  if (thr_no == receiver_thread_no)
3554  continue;
3555  void *dummy_return_status;
3556  NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
3557  NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
3558  }
3559 }
3560 
3561 int
3562 ThreadConfig::doStart(NodeState::StartLevel startLevel)
3563 {
3564  SignalT<3> signalT;
3565  memset(&signalT.header, 0, sizeof(SignalHeader));
3566 
3567  signalT.header.theVerId_signalNumber = GSN_START_ORD;
3568  signalT.header.theReceiversBlockNumber = CMVMI;
3569  signalT.header.theSendersBlockRef = 0;
3570  signalT.header.theTrace = 0;
3571  signalT.header.theSignalId = 0;
3572  signalT.header.theLength = StartOrd::SignalLength;
3573 
3574  StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
3575  startOrd->restartInfo = 0;
3576 
3577  sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
3578  return 0;
3579 }
3580 
3581 /*
3582  * Compare signal ids, taking into account overflow/wrapover.
3583  * Return same as strcmp().
3584  * Eg.
3585  * wrap_compare(0x10,0x20) -> -1
3586  * wrap_compare(0x10,0xffffff20) -> 1
3587  * wrap_compare(0xffffff80,0xffffff20) -> 1
3588  * wrap_compare(0x7fffffff, 0x80000001) -> -1
3589  */
3590 static
3591 inline
3592 int
3593 wrap_compare(Uint32 a, Uint32 b)
3594 {
3595  /* Avoid dependencies on undefined C/C++ interger overflow semantics. */
3596  if (a >= 0x80000000)
3597  if (b >= 0x80000000)
3598  return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
3599  else
3600  return (a - b) >= 0x80000000 ? -1 : 1;
3601  else
3602  if (b >= 0x80000000)
3603  return (b - a) >= 0x80000000 ? 1 : -1;
3604  else
3605  return (int)a - (int)b;
3606 }
3607 
3608 Uint32
3609 FastScheduler::traceDumpGetNumThreads()
3610 {
3611  /* The last thread is only for receiver -> no trace file. */
3612  return num_threads;
3613 }
3614 
3615 bool
3616 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
3617  const Uint32 * & thrdTheEmulatedJam,
3618  Uint32 & thrdTheEmulatedJamIndex)
3619 {
3620  if (thr_no >= num_threads)
3621  return false;
3622 
3623 #ifdef NO_EMULATED_JAM
3624  jamBlockNumber = 0;
3625  thrdTheEmulatedJam = NULL;
3626  thrdTheEmulatedJamIndex = 0;
3627 #else
3628  const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
3629  thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
3630  thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
3631  jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
3632 #endif
3633  return true;
3634 }
3635 
3636 void
3637 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
3638 {
3639  /*
3640  * We are about to generate trace files for all threads.
3641  *
3642  * We want to stop all threads processing before we dump, as otherwise the
3643  * signal buffers could change while dumping, leading to inconsistent
3644  * results.
3645  *
3646  * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
3647  * thread. We then wait for threads to signal they are done (but not forever,
3648  * so as to not have one hanging thread prevent the generation of trace
3649  * dumps). We also must be careful not to send to ourself if the crash is
3650  * being processed by one of the threads processing signals.
3651  *
3652  * We do not stop the transporter thread, as it cannot receive signals (but
3653  * because it does not receive signals it does not really influence dumps in
3654  * any case).
3655  */
3656  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3657  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3658  /* The selfptr might be NULL, or pointer to thread that crashed. */
3659 
3660  Uint32 waitFor_count = 0;
3661  NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3662  g_thr_repository.stopped_threads = 0;
3663 
3664  for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
3665  {
3666  if (selfptr != NULL && selfptr->m_thr_no == thr_no)
3667  {
3668  /* This is own thread; we have already stopped processing. */
3669  continue;
3670  }
3671 
3672  sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
3673 
3674  waitFor_count++;
3675  }
3676 
3677  static const Uint32 max_wait_seconds = 2;
3678  NDB_TICKS start = NdbTick_CurrentMillisecond();
3679  while (g_thr_repository.stopped_threads < waitFor_count)
3680  {
3681  NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
3682  &g_thr_repository.stop_for_crash_mutex,
3683  10);
3684  NDB_TICKS now = NdbTick_CurrentMillisecond();
3685  if (now > start + max_wait_seconds * 1000)
3686  break; // Give up
3687  }
3688  if (g_thr_repository.stopped_threads < waitFor_count)
3689  {
3690  if (nst != NST_ErrorInsert)
3691  {
3692  nst = NST_Watchdog; // Make this abort fast
3693  }
3694  ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
3695  waitFor_count - g_thr_repository.stopped_threads);
3696  }
3697  NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3698 
3699  /* Now we are ready (or as ready as can be) for doing crash dump. */
3700 }
3701 
3702 void mt_execSTOP_FOR_CRASH()
3703 {
3704  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3705  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3706  require(selfptr != NULL);
3707 
3708  NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3709  g_thr_repository.stopped_threads++;
3710  NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
3711  NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3712 
3713  /* ToDo: is this correct? */
3714  globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
3715 
3716  pthread_exit(NULL);
3717 }
3718 
3719 void
3720 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
3721 {
3722  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3723  thr_data *selfptr = reinterpret_cast<thr_data *>(value);
3724  const thr_repository *rep = &g_thr_repository;
3725  /*
3726  * The selfptr might be NULL, or pointer to thread that is doing the crash
3727  * jump.
3728  * If non-null, we should update the watchdog counter while dumping.
3729  */
3730  Uint32 *watchDogCounter;
3731  if (selfptr)
3732  watchDogCounter = &selfptr->m_watchdog_counter;
3733  else
3734  watchDogCounter = NULL;
3735 
3736  /*
3737  * We want to dump the signal buffers from last executed to first executed.
3738  * So we first need to find the correct sequence to output signals in, stored
3739  * in this arrray.
3740  *
3741  * We will check any buffers in the cyclic m_free_fifo. In addition,
3742  * we also need to scan the already executed part of the current
3743  * buffer in m_jba.
3744  *
3745  * Due to partial execution of prio A buffers, we will use signal ids to know
3746  * where to interleave prio A signals into the stream of prio B signals
3747  * read. So we will keep a pointer to a prio A buffer around; and while
3748  * scanning prio B buffers we will interleave prio A buffers from that buffer
3749  * when the signal id fits the sequence.
3750  *
3751  * This also means that we may have to discard the earliest part of available
3752  * prio A signal data due to too little prio B data present, or vice versa.
3753  */
3754  static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
3755  struct {
3756  const SignalHeader *ptr;
3757  bool prioa;
3758  } signalSequence[MAX_SIGNALS_TO_DUMP];
3759  Uint32 seq_start = 0;
3760  Uint32 seq_end = 0;
3761 
3762  const thr_data *thr_ptr = &rep->m_thread[thr_no];
3763  if (watchDogCounter)
3764  *watchDogCounter = 4;
3765 
3766  /*
3767  * ToDo: Might do some sanity check to avoid crashing on not yet initialised
3768  * thread.
3769  */
3770 
3771  /* Scan all available buffers with already executed signals. */
3772 
3773  /*
3774  * Keep track of all available buffers, so that we can pick out signals in
3775  * the same order they were executed (order obtained from signal id).
3776  *
3777  * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
3778  * (and freed) buffers, plus MAX_THREADS buffers for currently active
3779  * prio B buffers, plus one active prio A buffer.
3780  */
3781  struct {
3782  const thr_job_buffer *m_jb;
3783  Uint32 m_pos;
3784  Uint32 m_max;
3785  } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
3786 
3787  Uint32 num_jbs = 0;
3788 
3789  /* Load released buffers. */
3790  Uint32 idx = thr_ptr->m_first_free;
3791  while (idx != thr_ptr->m_first_unused)
3792  {
3793  const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
3794  if (q->m_len > 0)
3795  {
3796  jbs[num_jbs].m_jb = q;
3797  jbs[num_jbs].m_pos = 0;
3798  jbs[num_jbs].m_max = q->m_len;
3799  num_jbs++;
3800  }
3801  idx = (idx + 1) % THR_FREE_BUF_MAX;
3802  }
3803  /* Load any active prio B buffers. */
3804  for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
3805  {
3806  const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
3807  const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
3808  Uint32 read_pos = r->m_read_pos;
3809  if (read_pos > 0)
3810  {
3811  jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
3812  jbs[num_jbs].m_pos = 0;
3813  jbs[num_jbs].m_max = read_pos;
3814  num_jbs++;
3815  }
3816  }
3817  /* Load any active prio A buffer. */
3818  const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
3819  Uint32 read_pos = r->m_read_pos;
3820  if (read_pos > 0)
3821  {
3822  jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
3823  jbs[num_jbs].m_pos = 0;
3824  jbs[num_jbs].m_max = read_pos;
3825  num_jbs++;
3826  }
3827 
3828  /* Now pick out one signal at a time, in signal id order. */
3829  while (num_jbs > 0)
3830  {
3831  if (watchDogCounter)
3832  *watchDogCounter = 4;
3833 
3834  /* Search out the smallest signal id remaining. */
3835  Uint32 idx_min = 0;
3836  const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
3837  const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
3838  Uint32 sid_min = s_min->theSignalId;
3839 
3840  for (Uint32 i = 1; i < num_jbs; i++)
3841  {
3842  p = jbs[i].m_jb->m_data + jbs[i].m_pos;
3843  const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
3844  Uint32 sid = s->theSignalId;
3845  if (wrap_compare(sid, sid_min) < 0)
3846  {
3847  idx_min = i;
3848  s_min = s;
3849  sid_min = sid;
3850  }
3851  }
3852 
3853  /* We found the next signal, now put it in the ordered cyclic buffer. */
3854  signalSequence[seq_end].ptr = s_min;
3855  signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
3856  Uint32 siglen =
3857  (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
3858 #if SIZEOF_CHARP == 8
3859  /* Align to 8-byte boundary, to ensure aligned copies. */
3860  siglen= (siglen+1) & ~((Uint32)1);
3861 #endif
3862  jbs[idx_min].m_pos += siglen;
3863  if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
3864  {
3865  /* We are done with this job buffer. */
3866  num_jbs--;
3867  jbs[idx_min] = jbs[num_jbs];
3868  }
3869  seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
3870  /* Drop old signals if too many available in history. */
3871  if (seq_end == seq_start)
3872  seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
3873  }
3874 
3875  /* Now, having build the correct signal sequence, we can dump them all. */
3876  fprintf(out, "\n");
3877  bool first_one = true;
3878  bool out_of_signals = false;
3879  Uint32 lastSignalId = 0;
3880  while (seq_end != seq_start)
3881  {
3882  if (watchDogCounter)
3883  *watchDogCounter = 4;
3884 
3885  if (seq_end == 0)
3886  seq_end = MAX_SIGNALS_TO_DUMP;
3887  seq_end--;
3888  SignalT<25> signal;
3889  const SignalHeader *s = signalSequence[seq_end].ptr;
3890  unsigned siglen = (sizeof(*s)>>2) + s->theLength;
3891  if (siglen > 25)
3892  siglen = 25; // Sanity check
3893  memcpy(&signal.header, s, 4*siglen);
3894  // instance number in trace file is confusing if not MT LQH
3895  if (num_lqh_workers == 0)
3896  signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
3897 
3898  const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
3899  signal.m_sectionPtrI[0] = posptr[siglen + 0];
3900  signal.m_sectionPtrI[1] = posptr[siglen + 1];
3901  signal.m_sectionPtrI[2] = posptr[siglen + 2];
3902  bool prioa = signalSequence[seq_end].prioa;
3903 
3904  /* Make sure to display clearly when there is a gap in the dump. */
3905  if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
3906  {
3907  out_of_signals = true;
3908  fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
3909  "incomplete.\n\n\n\n", prioa ? "B" : "A");
3910  }
3911  first_one = false;
3912  lastSignalId = s->theSignalId;
3913 
3914  fprintf(out, "--------------- Signal ----------------\n");
3915  Uint32 prio = (prioa ? JBA : JBB);
3917  signal.header,
3918  prio,
3919  globalData.ownId,
3920  true);
3922  signal.header,
3923  &signal.theData[0]);
3924  }
3925  fflush(out);
3926 }
3927 
3928 int
3929 FastScheduler::traceDumpGetCurrentThread()
3930 {
3931  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3932  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3933 
3934  /* The selfptr might be NULL, or pointer to thread that crashed. */
3935  if (selfptr == 0)
3936  {
3937  return -1;
3938  }
3939  else
3940  {
3941  return (int)selfptr->m_thr_no;
3942  }
3943 }
3944 
3945 void
3946 mt_section_lock()
3947 {
3948  lock(&(g_thr_repository.m_section_lock));
3949 }
3950 
3951 void
3952 mt_section_unlock()
3953 {
3954  unlock(&(g_thr_repository.m_section_lock));
3955 }
3956 
3957 void
3958 mt_mem_manager_init()
3959 {
3960 }
3961 
3962 void
3963 mt_mem_manager_lock()
3964 {
3965  lock(&(g_thr_repository.m_mem_manager_lock));
3966 }
3967 
3968 void
3969 mt_mem_manager_unlock()
3970 {
3971  unlock(&(g_thr_repository.m_mem_manager_lock));
3972 }
3973 
3974 Vector<mt_lock_stat> g_locks;
3975 template class Vector<mt_lock_stat>;
3976 
3977 static
3978 void
3979 register_lock(const void * ptr, const char * name)
3980 {
3981  if (name == 0)
3982  return;
3983 
3984  mt_lock_stat* arr = g_locks.getBase();
3985  for (size_t i = 0; i<g_locks.size(); i++)
3986  {
3987  if (arr[i].m_ptr == ptr)
3988  {
3989  if (arr[i].m_name)
3990  {
3991  free(arr[i].m_name);
3992  }
3993  arr[i].m_name = strdup(name);
3994  return;
3995  }
3996  }
3997 
3998  mt_lock_stat ln;
3999  ln.m_ptr = ptr;
4000  ln.m_name = strdup(name);
4001  ln.m_contended_count = 0;
4002  ln.m_spin_count = 0;
4003  g_locks.push_back(ln);
4004 }
4005 
4006 static
4007 mt_lock_stat *
4008 lookup_lock(const void * ptr)
4009 {
4010  mt_lock_stat* arr = g_locks.getBase();
4011  for (size_t i = 0; i<g_locks.size(); i++)
4012  {
4013  if (arr[i].m_ptr == ptr)
4014  return arr + i;
4015  }
4016 
4017  return 0;
4018 }
4019 
4020 Uint32
4021 mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
4022  Uint32 dst[], Uint32 len)
4023 {
4024  Uint32 cnt = 0;
4025  Bitmask<(MAX_THREADS+31)/32> mask;
4026  mask.set(threadId);
4027  for (Uint32 i = 0; blocks[i] != 0; i++)
4028  {
4029  Uint32 block = blocks[i];
4033  assert(block == blockToMain(block));
4034  Uint32 index = block - MIN_BLOCK_NO;
4035  for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
4036  {
4037  Uint32 thr_no = thr_map[index][instance].thr_no;
4038  if (thr_no == thr_map_entry::NULL_THR_NO)
4039  break;
4040 
4041  if (mask.get(thr_no))
4042  continue;
4043 
4044  mask.set(thr_no);
4045  require(cnt < len);
4046  dst[cnt++] = numberToRef(block, instance, 0);
4047  }
4048  }
4049  return cnt;
4050 }
4051 
4052 void
4053 mt_wakeup(class SimulatedBlock* block)
4054 {
4055  Uint32 thr_no = block->getThreadId();
4056  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4057  wakeup(&thrptr->m_waiter);
4058 }
4059 
4060 #ifdef VM_TRACE
4061 void
4062 mt_assert_own_thread(SimulatedBlock* block)
4063 {
4064  Uint32 thr_no = block->getThreadId();
4065  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4066 
4067  if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
4068  {
4069  fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
4070  fflush(stderr);
4071  abort();
4072  }
4073 }
4074 #endif
4075 
4079 struct thr_repository g_thr_repository;
4080 
4081 struct trp_callback g_trp_callback;
4082 
4083 TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);