MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MemoryChannelTest.cpp
1 /*
2  Copyright (C) 2003, 2005, 2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "MemoryChannel.hpp"
20 #include "NdbThread.h"
21 #include "NdbSleep.h"
22 #include "NdbOut.hpp"
23 #include "NdbMain.h"
24 
25 
26 
27 MemoryChannel<int>* theMemoryChannel;
28 
29 
30 extern "C" void* runProducer(void*arg)
31 {
32  // The producer will items into the MemoryChannel
33  int count = *(int*)arg;
34  int* p;
35  int i = 0;
36  while (i <= count)
37  {
38  p = new int(i);
39  ndbout << "P: " << *p << endl;
40  theMemoryChannel->writeChannel(p);
41  if (i%5==0)
42  NdbSleep_MilliSleep(i);
43  i++;
44  }
45  return NULL;
46 }
47 
48 extern "C" void* runConsumer(void* arg)
49 {
50  // The producer will read items from MemoryChannel and print on screen
51  int count = *(int*)arg;
52  int* p;
53  int i = 0;
54  while (i < count)
55  {
56  p = theMemoryChannel->readChannel();
57  ndbout << "C: " << *p << endl;
58  i = *p;
59  delete p;
60 
61  }
62  return NULL;
63 }
64 
65 
66 
67 class ArgStruct
68 {
69 public:
70  ArgStruct(int _items, int _no){
71  items=_items;
72  no=_no;
73  };
74  int items;
75  int no;
76 };
77 
78 MemoryChannelMultipleWriter<ArgStruct>* theMemoryChannel2;
79 
80 extern "C" void* runProducer2(void*arg)
81 {
82  // The producer will items into the MemoryChannel
83  ArgStruct* pArg = (ArgStruct*)arg;
84  int count = pArg->items;
85  ArgStruct* p;
86  int i = 0;
87  while (i < count)
88  {
89  p = new ArgStruct(i, pArg->no);
90  ndbout << "P"<<pArg->no<<": " << i << endl;
91  theMemoryChannel2->writeChannel(p);
92  NdbSleep_MilliSleep(i);
93  i++;
94  }
95  return NULL;
96 }
97 
98 extern "C" void* runConsumer2(void* arg)
99 {
100  // The producer will read items from MemoryChannel and print on screen
101  ArgStruct* pArg = (ArgStruct*)arg;
102  int count = pArg->items * pArg->no;
103  ArgStruct* p;
104  int i = 0;
105  while (i < count)
106  {
107  p = theMemoryChannel2->readChannel();
108  ndbout << "C: "<< p->no << ", " << p->items << endl;
109  i++;
110  delete p;
111  }
112  ndbout << "Consumer2: " << count << " received" << endl;
113  return NULL;
114 }
115 
116 
117 
118 
119 //#if defined MEMORYCHANNELTEST
120 
121 //int main(int argc, char **argv)
122 NDB_COMMAND(mctest, "mctest", "mctest", "Test the memory channel used in Ndb", 32768)
123 {
124 
125  ndbout << "==== testing MemoryChannel ====" << endl;
126 
127  theMemoryChannel = new MemoryChannel<int>;
128  theMemoryChannel2 = new MemoryChannelMultipleWriter<ArgStruct>;
129 
130  NdbThread* consumerThread;
131  NdbThread* producerThread;
132 
133  NdbThread_SetConcurrencyLevel(2);
134 
135  int numItems = 100;
136  producerThread = NdbThread_Create(runProducer,
137  (void**)&numItems,
138  4096,
139  (char*)"producer");
140 
141  consumerThread = NdbThread_Create(runConsumer,
142  (void**)&numItems,
143  4096,
144  (char*)"consumer");
145 
146 
147  void *status;
148  NdbThread_WaitFor(consumerThread, &status);
149  NdbThread_WaitFor(producerThread, &status);
150 
151  ndbout << "==== testing MemoryChannelMultipleWriter ====" << endl;
152 #define NUM_THREADS2 5
153  NdbThread_SetConcurrencyLevel(NUM_THREADS2+2);
154  NdbThread* producerThreads[NUM_THREADS2];
155 
156  ArgStruct *pArg;
157  for (int j = 0; j < NUM_THREADS2; j++)
158  {
159  char buf[25];
160  sprintf((char*)&buf, "producer%d", j);
161  pArg = new ArgStruct(numItems, j);
162  producerThreads[j] = NdbThread_Create(runProducer2,
163  (void**)pArg,
164  4096,
165  (char*)&buf);
166  }
167 
168  pArg = new ArgStruct(numItems, NUM_THREADS2);
169  consumerThread = NdbThread_Create(runConsumer2,
170  (void**)pArg,
171  4096,
172  (char*)"consumer");
173 
174 
175  NdbThread_WaitFor(consumerThread, &status);
176  for (int j = 0; j < NUM_THREADS2; j++)
177  {
178  NdbThread_WaitFor(producerThreads[j], &status);
179  }
180 
181 
182  return 0;
183 
184 }
185 
186 void ErrorReporter::handleError(ErrorCategory type, int messageID,
187  const char* problemData, const char* objRef,
188  NdbShutdownType nst)
189 {
190 
191  ndbout << "ErrorReporter::handleError activated" << endl;
192  exit(1);
193 }
194 
195 //#endif