MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mt_thr_config.cpp
1 /*
2  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "mt_thr_config.hpp"
19 #include <kernel/ndb_limits.h>
20 #include "../../common/util/parse_mask.hpp"
21 
22 #ifndef TEST_MT_THR_CONFIG
23 #define SUPPORT_CPU_SET 0
24 #else
25 #define SUPPORT_CPU_SET 1
26 #endif
27 
28 static const struct THRConfig::Entries m_entries[] =
29 {
30  // name type min max
31  { "main", THRConfig::T_MAIN, 1, 1 },
32  { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS },
33  { "recv", THRConfig::T_RECV, 1, 1 },
34  { "rep", THRConfig::T_REP, 1, 1 },
35  { "io", THRConfig::T_IO, 1, 1 }
36 };
37 
38 static const struct THRConfig::Param m_params[] =
39 {
40  { "count", THRConfig::Param::S_UNSIGNED },
41  { "cpubind", THRConfig::Param::S_BITMASK },
42  { "cpuset", THRConfig::Param::S_BITMASK }
43 };
44 
45 #define IX_COUNT 0
46 #define IX_CPUBOUND 1
47 #define IX_CPUSET 2
48 
49 static
50 unsigned
51 getMaxEntries(Uint32 type)
52 {
53  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
54  {
55  if (m_entries[i].m_type == type)
56  return m_entries[i].m_max_cnt;
57  }
58  return 0;
59 }
60 
61 static
62 const char *
63 getEntryName(Uint32 type)
64 {
65  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
66  {
67  if (m_entries[i].m_type == type)
68  return m_entries[i].m_name;
69  }
70  return 0;
71 }
72 
73 static
74 Uint32
75 getEntryType(const char * type)
76 {
77  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
78  {
79  if (strcasecmp(type, m_entries[i].m_name) == 0)
80  return i;
81  }
82 
83  return THRConfig::T_END;
84 }
85 
86 THRConfig::THRConfig()
87 {
88  m_classic = false;
89 }
90 
91 THRConfig::~THRConfig()
92 {
93 }
94 
95 int
96 THRConfig::setLockExecuteThreadToCPU(const char * mask)
97 {
98  int res = parse_mask(mask, m_LockExecuteThreadToCPU);
99  if (res < 0)
100  {
101  m_err_msg.assfmt("failed to parse 'LockExecuteThreadToCPU=%s' "
102  "(error: %d)",
103  mask, res);
104  return -1;
105  }
106  return 0;
107 }
108 
109 int
110 THRConfig::setLockIoThreadsToCPU(unsigned val)
111 {
112  m_LockIoThreadsToCPU.set(val);
113  return 0;
114 }
115 
116 void
117 THRConfig::add(T_Type t)
118 {
119  T_Thread tmp;
120  tmp.m_type = t;
121  tmp.m_bind_type = T_Thread::B_UNBOUND;
122  tmp.m_no = m_threads[t].size();
123  m_threads[t].push_back(tmp);
124 }
125 
126 int
127 THRConfig::do_parse(unsigned MaxNoOfExecutionThreads,
128  unsigned __ndbmt_lqh_threads,
129  unsigned __ndbmt_classic)
130 {
134  if (__ndbmt_classic)
135  {
136  m_classic = true;
137  add(T_LDM);
138  add(T_MAIN);
139  add(T_IO);
140  return do_bindings();
141  }
142 
143  Uint32 lqhthreads = 0;
144  switch(MaxNoOfExecutionThreads){
145  case 0:
146  case 1:
147  case 2:
148  case 3:
149  lqhthreads = 1; // TC + receiver + SUMA + LQH
150  break;
151  case 4:
152  case 5:
153  case 6:
154  lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
155  break;
156  default:
157  lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
158  }
159 
160  if (__ndbmt_lqh_threads)
161  {
162  lqhthreads = __ndbmt_lqh_threads;
163  }
164 
165  add(T_MAIN);
166  add(T_REP);
167  add(T_RECV);
168  add(T_IO);
169  for(Uint32 i = 0; i < lqhthreads; i++)
170  {
171  add(T_LDM);
172  }
173 
174  return do_bindings() || do_validate();
175 }
176 
177 int
179 {
180  if (m_LockIoThreadsToCPU.count() == 1)
181  {
182  m_threads[T_IO][0].m_bind_type = T_Thread::B_CPU_BOUND;
183  m_threads[T_IO][0].m_bind_no = m_LockIoThreadsToCPU.getBitNo(0);
184  }
185  else if (m_LockIoThreadsToCPU.count() > 1)
186  {
187  unsigned no = createCpuSet(m_LockIoThreadsToCPU);
188  m_threads[T_IO][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
189  m_threads[T_IO][0].m_bind_no = no;
190  }
191 
195  for (unsigned i = 0; i<m_cpu_sets.size(); i++)
196  {
197  for (unsigned j = i + 1; j < m_cpu_sets.size(); j++)
198  {
199  if (m_cpu_sets[i].overlaps(m_cpu_sets[j]))
200  {
201  m_err_msg.assfmt("Overlapping cpuset's [ %s ] and [ %s ]",
202  m_cpu_sets[i].str().c_str(),
203  m_cpu_sets[j].str().c_str());
204  return -1;
205  }
206  }
207  }
208 
212  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
213  {
214  for (unsigned j = 0; j < m_threads[i].size(); j++)
215  {
216  if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
217  {
218  unsigned cpu = m_threads[i][j].m_bind_no;
219  for (unsigned k = 0; k<m_cpu_sets.size(); k++)
220  {
221  if (m_cpu_sets[k].get(cpu))
222  {
223  m_err_msg.assfmt("Overlapping cpubind %u with cpuset [ %s ]",
224  cpu,
225  m_cpu_sets[k].str().c_str());
226 
227  return -1;
228  }
229  }
230  }
231  }
232  }
233 
237  for (unsigned i = 0; i<m_cpu_sets.size(); i++)
238  {
239  for (unsigned j = 0; j < m_cpu_sets[i].count(); j++)
240  {
241  m_LockExecuteThreadToCPU.clear(m_cpu_sets[i].getBitNo(j));
242  }
243  }
244 
245  unsigned cnt_unbound = 0;
246  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
247  {
248  for (unsigned j = 0; j < m_threads[i].size(); j++)
249  {
250  if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
251  {
252  unsigned cpu = m_threads[i][j].m_bind_no;
253  m_LockExecuteThreadToCPU.clear(cpu);
254  }
255  else if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
256  {
257  cnt_unbound ++;
258  }
259  }
260  }
261 
262  if (m_threads[T_IO][0].m_bind_type == T_Thread::B_UNBOUND)
263  {
267  cnt_unbound--;
268  }
269 
270  if (m_LockExecuteThreadToCPU.count())
271  {
275  SparseBitmask& mask = m_LockExecuteThreadToCPU;
276  unsigned cnt = mask.count();
277  unsigned num_threads = cnt_unbound;
278  bool isMtLqh = !m_classic;
279 
280  if (cnt < num_threads)
281  {
282  m_info_msg.assfmt("WARNING: Too few CPU's specified with "
283  "LockExecuteThreadToCPU. Only %u specified "
284  " but %u was needed, this may cause contention.\n",
285  cnt, num_threads);
286  }
287 
288  if (cnt >= num_threads)
289  {
290  m_info_msg.appfmt("Assigning each thread its own CPU\n");
291  unsigned no = 0;
292  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
293  {
294  if (i == T_IO)
295  continue;
296  for (unsigned j = 0; j < m_threads[i].size(); j++)
297  {
298  if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
299  {
300  m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND;
301  m_threads[i][j].m_bind_no = mask.getBitNo(no);
302  no++;
303  }
304  }
305  }
306  }
307  else if (cnt == 1)
308  {
309  unsigned cpu = mask.getBitNo(0);
310  m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu);
311  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
312  {
313  if (i == T_IO)
314  continue;
315  bind_unbound(m_threads[i], cpu);
316  }
317  }
318  else if (isMtLqh)
319  {
320  unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
321  if (cnt > unbound_ldm)
322  {
326  m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and "
327  "other threads will share remaining\n");
328  unsigned cpu = mask.find(0);
329  for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
330  {
331  if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
332  {
333  m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
334  m_threads[T_LDM][i].m_bind_no = cpu;
335  mask.clear(cpu);
336  cpu = mask.find(cpu + 1);
337  }
338  }
339 
340  cpu = mask.find(0);
341  bind_unbound(m_threads[T_MAIN], cpu);
342  bind_unbound(m_threads[T_REP], cpu);
343  if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
344  {
345  cpu = mask.find(0);
346  }
347  bind_unbound(m_threads[T_RECV], cpu);
348  }
349  else
350  {
351  // put receiver, tc, backup/suma in 1 thread,
352  // and round robin LQH for rest
353  unsigned cpu = mask.find(0);
354  m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and "
355  "other threads will share CPU %u\n", cpu);
356  bind_unbound(m_threads[T_MAIN], cpu); // TC
357  bind_unbound(m_threads[T_REP], cpu);
358  bind_unbound(m_threads[T_RECV], cpu);
359  mask.clear(cpu);
360 
361  cpu = mask.find(0);
362  for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
363  {
364  if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
365  {
366  m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
367  m_threads[T_LDM][i].m_bind_no = cpu;
368  if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
369  {
370  cpu = mask.find(0);
371  }
372  }
373  }
374  }
375  }
376  else
377  {
378  unsigned cpu = mask.find(0);
379  m_info_msg.appfmt("Assigning LQH thread to CPU %u and "
380  "other threads will share\n", cpu);
381  bind_unbound(m_threads[T_LDM], cpu);
382  cpu = mask.find(cpu + 1);
383  bind_unbound(m_threads[T_MAIN], cpu);
384  bind_unbound(m_threads[T_RECV], cpu);
385  }
386  }
387 
388  return 0;
389 }
390 
391 unsigned
392 THRConfig::count_unbound(const Vector<T_Thread>& vec) const
393 {
394  unsigned cnt = 0;
395  for (unsigned i = 0; i < vec.size(); i++)
396  {
397  if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
398  cnt ++;
399  }
400  return cnt;
401 }
402 
403 void
404 THRConfig::bind_unbound(Vector<T_Thread>& vec, unsigned cpu)
405 {
406  for (unsigned i = 0; i < vec.size(); i++)
407  {
408  if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
409  {
410  vec[i].m_bind_type = T_Thread::B_CPU_BOUND;
411  vec[i].m_bind_no = cpu;
412  }
413  }
414 }
415 
416 int
418 {
422  for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++)
423  {
424  if (m_threads[i].size() > getMaxEntries(i))
425  {
426  m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u",
427  m_threads[i].size(),
428  getEntryName(i),
429  getMaxEntries(i));
430  return -1;
431  }
432  }
433 
437  if (m_threads[T_LDM].size() == 3)
438  {
439  m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
440  m_threads[T_LDM].size());
441  return -1;
442  }
443 
444  return 0;
445 }
446 
447 const char *
448 THRConfig::getConfigString()
449 {
450  m_cfg_string.clear();
451  const char * sep = "";
452  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
453  {
454  if (m_threads[i].size())
455  {
456  const char * name = getEntryName(i);
457  if (i != T_IO)
458  {
459  for (unsigned j = 0; j < m_threads[i].size(); j++)
460  {
461  m_cfg_string.append(sep);
462  sep=",";
463  m_cfg_string.append(name);
464  if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
465  {
466  m_cfg_string.append("={");
467  if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
468  {
469  m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
470  }
471  else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
472  {
473  m_cfg_string.appfmt("cpuset=%s",
474  m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
475  }
476  m_cfg_string.append("}");
477  }
478  }
479  }
480  else
481  {
482  for (unsigned j = 0; j < m_threads[i].size(); j++)
483  {
484  if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
485  {
486  m_cfg_string.append(sep);
487  sep=",";
488  m_cfg_string.append(name);
489  m_cfg_string.append("={");
490  if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
491  {
492  m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
493  }
494  else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
495  {
496  m_cfg_string.appfmt("cpuset=%s",
497  m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
498  }
499  m_cfg_string.append("}");
500  }
501  }
502  }
503  }
504  }
505  return m_cfg_string.c_str();
506 }
507 
508 Uint32
509 THRConfig::getThreadCount() const
510 {
511  // Note! not counting T_IO
512  Uint32 cnt = 0;
513  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
514  {
515  if (i != T_IO)
516  {
517  cnt += m_threads[i].size();
518  }
519  }
520  return cnt;
521 }
522 
523 Uint32
524 THRConfig::getThreadCount(T_Type type) const
525 {
526  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
527  {
528  if (i == (Uint32)type)
529  {
530  return m_threads[i].size();
531  }
532  }
533  return 0;
534 }
535 
536 const char *
537 THRConfig::getErrorMessage() const
538 {
539  if (m_err_msg.empty())
540  return 0;
541  return m_err_msg.c_str();
542 }
543 
544 const char *
545 THRConfig::getInfoMessage() const
546 {
547  if (m_info_msg.empty())
548  return 0;
549  return m_info_msg.c_str();
550 }
551 
552 static
553 char *
554 skipblank(char * str)
555 {
556  while (isspace(* str))
557  str++;
558  return str;
559 }
560 
561 Uint32
562 THRConfig::find_type(char *& str)
563 {
564  str = skipblank(str);
565 
566  char * name = str;
567  if (* name == 0)
568  {
569  m_err_msg.assfmt("empty thread specification");
570  return 0;
571  }
572  char * end = name;
573  while(isalpha(* end))
574  end++;
575 
576  char save = * end;
577  * end = 0;
578  Uint32 t = getEntryType(name);
579  if (t == T_END)
580  {
581  m_err_msg.assfmt("unknown thread type '%s'", name);
582  }
583  * end = save;
584  str = end;
585  return t;
586 }
587 
589 {
590  ParamValue() { found = false;}
591  bool found;
592  const char * string_val;
593  unsigned unsigned_val;
594  SparseBitmask mask_val;
595 };
596 
597 static
598 int
599 parseUnsigned(char *& str, unsigned * dst)
600 {
601  str = skipblank(str);
602  char * endptr = 0;
603  errno = 0;
604  long val = strtol(str, &endptr, 0);
605  if (errno == ERANGE)
606  return -1;
607  if (val < 0 || Int64(val) > 0xFFFFFFFF)
608  return -1;
609  if (endptr == str)
610  return -1;
611  str = endptr;
612  *dst = (unsigned)val;
613  return 0;
614 }
615 
616 static
617 int
618 parseBitmask(char *& str, SparseBitmask * mask)
619 {
620  str = skipblank(str);
621  size_t len = strspn(str, "0123456789-, ");
622  if (len == 0)
623  return -1;
624 
625  while (isspace(str[len-1]))
626  len--;
627  if (str[len-1] == ',')
628  len--;
629  char save = str[len];
630  str[len] = 0;
631  int res = parse_mask(str, *mask);
632  str[len] = save;
633  str = str + len;
634  return res;
635 }
636 
637 static
638 int
639 parseParams(char * str, ParamValue values[], BaseString& err)
640 {
641  const char * const save = str;
642  while (* str)
643  {
644  str = skipblank(str);
645 
646  unsigned idx = 0;
647  for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
648  {
649 
650 #if ! SUPPORT_CPU_SET
651  if (idx == IX_CPUSET)
652  continue;
653 #endif
654 
655  if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
656  {
657  str += strlen(m_params[idx].name);
658  break;
659  }
660  }
661 
662  if (idx == NDB_ARRAY_SIZE(m_params))
663  {
664  err.assfmt("Unknown param near: '%s'", str);
665  return -1;
666  }
667 
668  if (values[idx].found == true)
669  {
670  err.assfmt("Param '%s' found twice", m_params[idx].name);
671  return -1;
672  }
673 
674  str = skipblank(str);
675  if (* str != '=')
676  {
677  err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save);
678  return -1;
679  }
680  str++;
681  str = skipblank(str);
682 
683  int res = 0;
684  switch(m_params[idx].type){
685  case THRConfig::Param::S_UNSIGNED:
686  res = parseUnsigned(str, &values[idx].unsigned_val);
687  break;
688  case THRConfig::Param::S_BITMASK:
689  res = parseBitmask(str, &values[idx].mask_val);
690  break;
691  default:
692  err.assfmt("Internal error, unknown type for param: '%s'",
693  m_params[idx].name);
694  return -1;
695  }
696  if (res == -1)
697  {
698  err.assfmt("Unable to parse %s=%s", m_params[idx].name, str);
699  return -1;
700  }
701  values[idx].found = true;
702  str = skipblank(str);
703 
704  if (* str == 0)
705  break;
706 
707  if (* str != ',')
708  {
709  err.assfmt("Unable to parse near '%s'", str);
710  return -1;
711  }
712  str++;
713  }
714  return 0;
715 }
716 
717 int
718 THRConfig::find_spec(char *& str, T_Type type)
719 {
720  str = skipblank(str);
721 
722  switch(* str){
723  case ',':
724  case 0:
725  add(type);
726  return 0;
727  }
728 
729  if (* str != '=')
730  {
731 err:
732  int len = (int)strlen(str);
733  m_err_msg.assfmt("Invalid format near: '%.*s'",
734  (len > 10) ? 10 : len, str);
735  return -1;
736  }
737 
738  str++; // skip over =
739  str = skipblank(str);
740 
741  if (* str != '{')
742  {
743  goto err;
744  }
745 
746  str++;
747  char * start = str;
748 
752  while (* str && (* str) != '}')
753  str++;
754 
755  if (* str != '}')
756  {
757  goto err;
758  }
759 
760  char * end = str;
761  char save = * end;
762  * end = 0;
763 
764  ParamValue values[NDB_ARRAY_SIZE(m_params)];
765  values[IX_COUNT].unsigned_val = 1;
766  int res = parseParams(start, values, m_err_msg);
767  * end = save;
768 
769  if (res != 0)
770  {
771  return -1;
772  }
773 
774  if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
775  {
776  m_err_msg.assfmt("Both cpuset and cpubind specified!");
777  return -1;
778  }
779 
780  unsigned cnt = values[IX_COUNT].unsigned_val;
781  const int index = m_threads[type].size();
782  for (unsigned i = 0; i < cnt; i++)
783  {
784  add(type);
785  }
786 
787  assert(m_threads[type].size() == index + cnt);
788  if (values[IX_CPUSET].found)
789  {
790  SparseBitmask & mask = values[IX_CPUSET].mask_val;
791  unsigned no = createCpuSet(mask);
792  for (unsigned i = 0; i < cnt; i++)
793  {
794  m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND;
795  m_threads[type][index+i].m_bind_no = no;
796  }
797  }
798  else if (values[IX_CPUBOUND].found)
799  {
800  SparseBitmask & mask = values[IX_CPUBOUND].mask_val;
801  if (mask.count() < cnt)
802  {
803  m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]",
804  getEntryName(type),
805  cnt,
806  mask.count(),
807  mask.str().c_str());
808  return -1;
809  }
810  for (unsigned i = 0; i < cnt; i++)
811  {
812  m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND;
813  m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count());
814  }
815  }
816 
817  str++; // skip over }
818  return 0;
819 }
820 
821 int
822 THRConfig::find_next(char *& str)
823 {
824  str = skipblank(str);
825 
826  if (* str == 0)
827  {
828  return 0;
829  }
830  else if (* str == ',')
831  {
832  str++;
833  return 1;
834  }
835 
836  int len = (int)strlen(str);
837  m_err_msg.assfmt("Invalid format near: '%.*s'",
838  (len > 10) ? 10 : len, str);
839  return -1;
840 }
841 
842 int
843 THRConfig::do_parse(const char * ThreadConfig)
844 {
845  BaseString str(ThreadConfig);
846  char * ptr = (char*)str.c_str();
847  while (* ptr)
848  {
849  Uint32 type = find_type(ptr);
850  if (type == T_END)
851  return -1;
852 
853  if (find_spec(ptr, (T_Type)type) < 0)
854  return -1;
855 
856  int ret = find_next(ptr);
857  if (ret < 0)
858  return ret;
859 
860  if (ret == 0)
861  break;
862  }
863 
864  for (Uint32 i = 0; i < T_END; i++)
865  {
866  while (m_threads[i].size() < m_entries[i].m_min_cnt)
867  add((T_Type)i);
868  }
869 
870  return do_bindings() || do_validate();
871 }
872 
873 unsigned
874 THRConfig::createCpuSet(const SparseBitmask& mask)
875 {
876  for (unsigned i = 0; i < m_cpu_sets.size(); i++)
877  if (m_cpu_sets[i].equal(mask))
878  return i;
879 
880  m_cpu_sets.push_back(mask);
881  return m_cpu_sets.size() - 1;
882 }
883 
884 template class Vector<SparseBitmask>;
885 template class Vector<THRConfig::T_Thread>;
886 
887 #ifndef TEST_MT_THR_CONFIG
888 #include <BlockNumbers.h>
889 #include <NdbThread.h>
890 
891 static
892 int
893 findBlock(Uint32 blockNo, const unsigned short list[], unsigned cnt)
894 {
895  for (Uint32 i = 0; i < cnt; i++)
896  {
897  if (blockToMain(list[i]) == blockNo)
898  return blockToInstance(list[i]);
899  }
900  return -1;
901 }
902 
903 const THRConfig::T_Thread*
904 THRConfigApplier::find_thread(const unsigned short instancelist[], unsigned cnt) const
905 {
906  int instanceNo;
907  if ((instanceNo = findBlock(SUMA, instancelist, cnt)) >= 0)
908  {
909  return &m_threads[T_REP][instanceNo];
910  }
911  else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0)
912  {
913  return &m_threads[T_RECV][instanceNo];
914  }
915  else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0)
916  {
917  return &m_threads[T_MAIN][instanceNo];
918  }
919  else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0)
920  {
921  return &m_threads[T_LDM][instanceNo - 1]; // remove proxy...
922  }
923  return 0;
924 }
925 
926 void
927 THRConfigApplier::appendInfo(BaseString& str,
928  const unsigned short list[], unsigned cnt) const
929 {
930  const T_Thread* thr = find_thread(list, cnt);
931  assert(thr != 0);
932  str.appfmt("(%s) ", getEntryName(thr->m_type));
933  if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
934  {
935  str.appfmt("cpu: %u ", thr->m_bind_no);
936  }
937  else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
938  {
939  str.appfmt("cpuset: [ %s ] ", m_cpu_sets[thr->m_bind_no].str().c_str());
940  }
941 }
942 
943 int
944 THRConfigApplier::create_cpusets()
945 {
946  return 0;
947 }
948 
949 int
950 THRConfigApplier::do_bind(NdbThread* thread,
951  const unsigned short list[], unsigned cnt)
952 {
953  const T_Thread* thr = find_thread(list, cnt);
954  if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
955  {
956  int res = NdbThread_LockCPU(thread, thr->m_bind_no);
957  if (res == 0)
958  return 1;
959  else
960  return -res;
961  }
962 #if TODO
963  else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
964  {
965  }
966 #endif
967 
968  return 0;
969 }
970 
971 int
972 THRConfigApplier::do_bind_io(NdbThread* thread)
973 {
974  const T_Thread* thr = &m_threads[T_IO][0];
975  if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
976  {
977  int res = NdbThread_LockCPU(thread, thr->m_bind_no);
978  if (res == 0)
979  return 1;
980  else
981  return -res;
982  }
983 #if TODO
984  else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
985  {
986  }
987 #endif
988 
989  return 0;
990 }
991 #endif
992 
993 #ifdef TEST_MT_THR_CONFIG
994 
995 #include <NdbTap.hpp>
996 
997 TAPTEST(mt_thr_config)
998 {
999  {
1000  THRConfig tmp;
1001  OK(tmp.do_parse(8, 0, 0) == 0);
1002  }
1003 
1007  {
1008  const char * ok[] =
1009  {
1010  "ldm,ldm",
1011  "ldm={count=3},ldm",
1012  "ldm={cpubind=1-2,5,count=3},ldm",
1013  "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
1014  "ldm={count=3,cpubind=1-2,5 }, ldm",
1015  "ldm={cpuset=1-3,count=3 },ldm",
1016  "main,ldm={},ldm",
1017  0
1018  };
1019 
1020  const char * fail [] =
1021  {
1022  "ldm,ldm,ldm",
1023  "ldm={cpubind= 1 , cpuset=2 },ldm",
1024  "ldm={count=4,cpubind=1-3},ldm",
1025  "main,main,ldm,ldm",
1026  "main={ keso=88, count=23},ldm,ldm",
1027  "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
1028  "main={ cpuset=1-3 }, ldm={cpubind=2}",
1029  0
1030  };
1031 
1032  for (Uint32 i = 0; ok[i]; i++)
1033  {
1034  THRConfig tmp;
1035  int res = tmp.do_parse(ok[i]);
1036  printf("do_parse(%s) => %s - %s\n", ok[i],
1037  res == 0 ? "OK" : "FAIL",
1038  res == 0 ? "" : tmp.getErrorMessage());
1039  OK(res == 0);
1040  {
1041  BaseString out(tmp.getConfigString());
1042  THRConfig check;
1043  OK(check.do_parse(out.c_str()) == 0);
1044  OK(strcmp(out.c_str(), check.getConfigString()) == 0);
1045  }
1046  }
1047 
1048  for (Uint32 i = 0; fail[i]; i++)
1049  {
1050  THRConfig tmp;
1051  int res = tmp.do_parse(fail[i]);
1052  printf("do_parse(%s) => %s - %s\n", fail[i],
1053  res == 0 ? "OK" : "FAIL",
1054  res == 0 ? "" : tmp.getErrorMessage());
1055  OK(res != 0);
1056  }
1057  }
1058 
1059  {
1063  const char * t[] =
1064  {
1066  "1-8",
1067  "ldm={count=4}",
1068  "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
1069 
1070  "1-5",
1071  "ldm={count=4}",
1072  "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
1073 
1074  "1-3",
1075  "ldm={count=4}",
1076  "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
1077 
1078  "1-4",
1079  "ldm={count=4}",
1080  "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
1081 
1082  "1-8",
1083  "ldm={count=4},io={cpubind=8}",
1084  "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}",
1085 
1086  "1-8",
1087  "ldm={count=4,cpubind=1,4,5,6}",
1088  "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
1089 
1090  // END
1091  0
1092  };
1093 
1094  for (unsigned i = 0; t[i]; i+= 3)
1095  {
1096  THRConfig tmp;
1097  tmp.setLockExecuteThreadToCPU(t[i+0]);
1098  int res = tmp.do_parse(t[i+1]);
1099  int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
1100  printf("mask: %s conf: %s => %s(%s) - %s - %s\n",
1101  t[i+0],
1102  t[i+1],
1103  res == 0 ? "OK" : "FAIL",
1104  res == 0 ? "" : tmp.getErrorMessage(),
1105  tmp.getConfigString(),
1106  ok == 1 ? "CORRECT" : "INCORRECT");
1107  OK(res == 0);
1108  OK(ok == 1);
1109  }
1110  }
1111 
1112  return 1;
1113 }
1114 
1115 #endif