MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
restore.cpp
1 /*
2  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "restore.hpp"
19 #include <signaldata/FsRef.hpp>
20 #include <signaldata/FsConf.hpp>
21 #include <signaldata/FsOpenReq.hpp>
22 #include <signaldata/FsCloseReq.hpp>
23 #include <signaldata/FsReadWriteReq.hpp>
24 #include <signaldata/RestoreImpl.hpp>
25 #include <signaldata/DictTabInfo.hpp>
26 #include <signaldata/KeyInfo.hpp>
27 #include <signaldata/AttrInfo.hpp>
28 #include <signaldata/LqhKey.hpp>
29 #include <AttributeHeader.hpp>
30 #include <md5_hash.hpp>
31 #include <dblqh/Dblqh.hpp>
32 #include <dbtup/Dbtup.hpp>
33 #include <KeyDescriptor.hpp>
34 
35 #define PAGES LCP_RESTORE_BUFFER
36 
37 Restore::Restore(Block_context& ctx, Uint32 instanceNumber) :
38  SimulatedBlock(RESTORE, ctx, instanceNumber),
39  m_file_list(m_file_pool),
40  m_file_hash(m_file_pool)
41 {
42  BLOCK_CONSTRUCTOR(Restore);
43 
44  // Add received signals
45  addRecSignal(GSN_STTOR, &Restore::execSTTOR);
46  addRecSignal(GSN_DUMP_STATE_ORD, &Restore::execDUMP_STATE_ORD);
47  addRecSignal(GSN_CONTINUEB, &Restore::execCONTINUEB);
48  addRecSignal(GSN_READ_CONFIG_REQ, &Restore::execREAD_CONFIG_REQ, true);
49 
50  addRecSignal(GSN_RESTORE_LCP_REQ, &Restore::execRESTORE_LCP_REQ);
51 
52  addRecSignal(GSN_FSOPENREF, &Restore::execFSOPENREF, true);
53  addRecSignal(GSN_FSOPENCONF, &Restore::execFSOPENCONF);
54  addRecSignal(GSN_FSREADREF, &Restore::execFSREADREF, true);
55  addRecSignal(GSN_FSREADCONF, &Restore::execFSREADCONF);
56  addRecSignal(GSN_FSCLOSEREF, &Restore::execFSCLOSEREF, true);
57  addRecSignal(GSN_FSCLOSECONF, &Restore::execFSCLOSECONF);
58 
59  addRecSignal(GSN_LQHKEYREF, &Restore::execLQHKEYREF);
60  addRecSignal(GSN_LQHKEYCONF, &Restore::execLQHKEYCONF);
61 
62  ndbrequire(sizeof(Column) == 8);
63 }
64 
65 Restore::~Restore()
66 {
67 }
68 
69 BLOCK_FUNCTIONS(Restore)
70 
71 void
72 Restore::execSTTOR(Signal* signal)
73 {
74  jamEntry();
75 
76  c_lqh = (Dblqh*)globalData.getBlock(DBLQH, instance());
77  c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
78  ndbrequire(c_lqh != 0 && c_tup != 0);
79  sendSTTORRY(signal);
80 
81  return;
82 }//Restore::execNDB_STTOR()
83 
84 void
85 Restore::execREAD_CONFIG_REQ(Signal* signal)
86 {
87  jamEntry();
88  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
89  Uint32 ref = req->senderRef;
90  Uint32 senderData = req->senderData;
91  ndbrequire(req->noOfParameters == 0);
92 
94  m_ctx.m_config.getOwnConfigIterator();
95  ndbrequire(p != 0);
96 
97 #if 0
98  Uint32 noBackups = 0, noTables = 0, noAttribs = 0;
99  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &m_diskless));
100  ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_BACKUPS, &noBackups);
101  // ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables));
102  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables));
103  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, &noAttribs));
104 
105  noAttribs++; //RT 527 bug fix
106 
107  c_backupPool.setSize(noBackups);
108  c_backupFilePool.setSize(3 * noBackups);
109  c_tablePool.setSize(noBackups * noTables);
110  c_attributePool.setSize(noBackups * noAttribs);
111  c_triggerPool.setSize(noBackups * 3 * noTables);
112 
113  // 2 = no of replicas
114  c_fragmentPool.setSize(noBackups * NO_OF_FRAG_PER_NODE * noTables);
115 
116  Uint32 szMem = 0;
117  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
118  Uint32 noPages = (szMem + sizeof(Page32) - 1) / sizeof(Page32);
119  // We need to allocate an additional of 2 pages. 1 page because of a bug in
120  // ArrayPool and another one for DICTTAINFO.
121  c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2);
122 
123  Uint32 szDataBuf = (2 * 1024 * 1024);
124  Uint32 szLogBuf = (2 * 1024 * 1024);
125  Uint32 szWrite = 32768;
126  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
127  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
128  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
129 
130  c_defaults.m_logBufferSize = szLogBuf;
131  c_defaults.m_dataBufferSize = szDataBuf;
132  c_defaults.m_minWriteSize = szWrite;
133  c_defaults.m_maxWriteSize = szWrite;
134 
135  { // Init all tables
136  ArrayList<Table> tables(c_tablePool);
137  TablePtr ptr;
138  while(tables.seize(ptr)){
139  new (ptr.p) Table(c_attributePool, c_fragmentPool);
140  }
141  tables.release();
142  }
143 
144  {
145  ArrayList<BackupFile> ops(c_backupFilePool);
146  BackupFilePtr ptr;
147  while(ops.seize(ptr)){
148  new (ptr.p) BackupFile(* this, c_pagePool);
149  }
150  ops.release();
151  }
152 
153  {
154  ArrayList<BackupRecord> recs(c_backupPool);
155  BackupRecordPtr ptr;
156  while(recs.seize(ptr)){
157  new (ptr.p) BackupRecord(* this, c_pagePool, c_tablePool,
158  c_backupFilePool, c_triggerPool);
159  }
160  recs.release();
161  }
162 
163  // Initialize BAT for interface to file system
164  {
165  Page32Ptr p;
166  ndbrequire(c_pagePool.seizeId(p, 0));
167  c_startOfPages = (Uint32 *)p.p;
168  c_pagePool.release(p);
169 
170  NewVARIABLE* bat = allocateBat(1);
171  bat[0].WA = c_startOfPages;
172  bat[0].nrr = c_pagePool.getSize()*sizeof(Page32)/sizeof(Uint32);
173  }
174 #endif
175  m_file_pool.setSize(1);
176  Uint32 cnt = 2*MAX_ATTRIBUTES_IN_TABLE;
177  cnt += PAGES;
178  cnt += List::getSegmentSize()-1;
179  cnt /= List::getSegmentSize();
180  cnt += 2;
181  m_databuffer_pool.setSize(cnt);
182 
183  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
184  conf->senderRef = reference();
185  conf->senderData = senderData;
186  sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
187  ReadConfigConf::SignalLength, JBB);
188 }
189 
190 void
191 Restore::sendSTTORRY(Signal* signal){
192  signal->theData[0] = 0;
193  signal->theData[3] = 1;
194  signal->theData[4] = 3;
195  signal->theData[5] = 255; // No more start phases from missra
196  BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : RESTORE_REF;
197  sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
198 }
199 
200 void
201 Restore::execCONTINUEB(Signal* signal){
202  jamEntry();
203 
204  switch(signal->theData[0]){
205  case RestoreContinueB::RESTORE_NEXT:
206  {
207  FilePtr file_ptr;
208  m_file_pool.getPtr(file_ptr, signal->theData[1]);
209  restore_next(signal, file_ptr);
210  return;
211  }
212  case RestoreContinueB::READ_FILE:
213  {
214  FilePtr file_ptr;
215  m_file_pool.getPtr(file_ptr, signal->theData[1]);
216  read_file(signal, file_ptr);
217  return;
218  }
219  default:
220  ndbrequire(false);
221  }
222 }
223 
224 void
225 Restore::execDUMP_STATE_ORD(Signal* signal){
226  jamEntry();
227 }
228 
229 void
230 Restore::execRESTORE_LCP_REQ(Signal* signal){
231  jamEntry();
232 
233  Uint32 err= 0;
234  RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtr();
235  Uint32 senderRef= req->senderRef;
236  Uint32 senderData= req->senderData;
237  do
238  {
239  FilePtr file_ptr;
240  if(!m_file_list.seize(file_ptr))
241  {
242  err= RestoreLcpRef::NoFileRecord;
243  break;
244  }
245 
246  if((err= init_file(req, file_ptr)))
247  {
248  break;
249  }
250 
251  open_file(signal, file_ptr);
252  return;
253  } while(0);
254 
255  RestoreLcpRef* ref= (RestoreLcpRef*)signal->getDataPtrSend();
256  ref->senderData= senderData;
257  ref->senderRef= reference();
258  ref->errorCode = err;
259  sendSignal(senderRef, GSN_RESTORE_LCP_REF, signal,
260  RestoreLcpRef::SignalLength, JBB);
261 }
262 
263 Uint32
264 Restore::init_file(const RestoreLcpReq* req, FilePtr file_ptr)
265 {
266  new (file_ptr.p) File();
267  file_ptr.p->m_sender_ref = req->senderRef;
268  file_ptr.p->m_sender_data = req->senderData;
269 
270  file_ptr.p->m_fd = RNIL;
271  file_ptr.p->m_file_type = BackupFormat::LCP_FILE;
272  file_ptr.p->m_status = File::FIRST_READ;
273 
274  file_ptr.p->m_lcp_no = req->lcpNo;
275  file_ptr.p->m_table_id = req->tableId;
276  file_ptr.p->m_fragment_id = req->fragmentId;
277  file_ptr.p->m_table_version = RNIL;
278 
279  file_ptr.p->m_bytes_left = 0; // Bytes read from FS
280  file_ptr.p->m_current_page_ptr_i = RNIL;
281  file_ptr.p->m_current_page_pos = 0;
282  file_ptr.p->m_current_page_index = 0;
283  file_ptr.p->m_current_file_page = 0;
284  file_ptr.p->m_outstanding_reads = 0;
285  file_ptr.p->m_outstanding_operations = 0;
286  file_ptr.p->m_rows_restored = 0;
287  LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
288  LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
289 
290  ndbassert(columns.isEmpty());
291  columns.release();
292 
293  ndbassert(pages.isEmpty());
294  pages.release();
295 
296  Uint32 buf_size= PAGES*GLOBAL_PAGE_SIZE;
297  Uint32 page_count= (buf_size+GLOBAL_PAGE_SIZE-1)/GLOBAL_PAGE_SIZE;
298  if(!pages.seize(page_count))
299  {
300  return RestoreLcpRef::OutOfDataBuffer;
301  }
302 
303  List::Iterator it;
304  for(pages.first(it); !it.isNull(); pages.next(it))
305  {
306  * it.data = RNIL;
307  }
308 
309  Uint32 err= 0;
310  for(pages.first(it); !it.isNull(); pages.next(it))
311  {
312  Ptr<GlobalPage> page_ptr;
313  if(!m_global_page_pool.seize(page_ptr))
314  {
315  err= RestoreLcpRef::OutOfReadBufferPages;
316  break;
317  }
318  * it.data = page_ptr.i;
319  }
320 
321  if(err)
322  {
323  for(pages.first(it); !it.isNull(); pages.next(it))
324  {
325  if(* it.data == RNIL)
326  break;
327  m_global_page_pool.release(* it.data);
328  }
329  }
330  else
331  {
332  pages.first(it);
333  file_ptr.p->m_current_page_ptr_i = *it.data;
334  }
335  return err;
336 }
337 
338 void
339 Restore::release_file(FilePtr file_ptr)
340 {
341  LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
342  LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
343 
344  List::Iterator it;
345  for(pages.first(it); !it.isNull(); pages.next(it))
346  {
347  if(* it.data == RNIL)
348  continue;
349  m_global_page_pool.release(* it.data);
350  }
351 
352  ndbout_c("RESTORE table: %d %lld rows applied",
353  file_ptr.p->m_table_id,
354  file_ptr.p->m_rows_restored);
355 
356  columns.release();
357  pages.release();
358  m_file_list.release(file_ptr);
359 }
360 
361 void
362 Restore::open_file(Signal* signal, FilePtr file_ptr)
363 {
364  signal->theData[0] = NDB_LE_StartReadLCP;
365  signal->theData[1] = file_ptr.p->m_table_id;
366  signal->theData[2] = file_ptr.p->m_fragment_id;
367  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
368 
369  FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
370  req->userReference = reference();
371  req->fileFlags = FsOpenReq::OM_READONLY | FsOpenReq::OM_GZ;
372  req->userPointer = file_ptr.i;
373 
374  FsOpenReq::setVersion(req->fileNumber, 5);
375  FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
376  FsOpenReq::v5_setLcpNo(req->fileNumber, file_ptr.p->m_lcp_no);
377  FsOpenReq::v5_setTableId(req->fileNumber, file_ptr.p->m_table_id);
378  FsOpenReq::v5_setFragmentId(req->fileNumber, file_ptr.p->m_fragment_id);
379  sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
380 }
381 
382 void
383 Restore::execFSOPENREF(Signal* signal)
384 {
385  FsRef* ref= (FsRef*)signal->getDataPtr();
386  FilePtr file_ptr;
387  jamEntry();
388  m_file_pool.getPtr(file_ptr, ref->userPointer);
389 
390  Uint32 errCode= ref->errorCode;
391  Uint32 osError= ref->osErrorCode;
392 
393  RestoreLcpRef* rep= (RestoreLcpRef*)signal->getDataPtrSend();
394  rep->senderData= file_ptr.p->m_sender_data;
395  rep->errorCode = errCode;
396  rep->extra[0] = osError;
397  sendSignal(file_ptr.p->m_sender_ref,
398  GSN_RESTORE_LCP_REF, signal, RestoreLcpRef::SignalLength+1, JBB);
399 
400  release_file(file_ptr);
401 }
402 
403 void
405 {
406  jamEntry();
407  FilePtr file_ptr;
408  FsConf* conf= (FsConf*)signal->getDataPtr();
409  m_file_pool.getPtr(file_ptr, conf->userPointer);
410 
411  file_ptr.p->m_fd = conf->filePointer;
412 
417  file_ptr.p->m_status |= File::FILE_THREAD_RUNNING;
418  signal->theData[0] = RestoreContinueB::READ_FILE;
419  signal->theData[1] = file_ptr.i;
420  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
421 
422  file_ptr.p->m_status |= File::RESTORE_THREAD_RUNNING;
423  signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
424  signal->theData[1] = file_ptr.i;
425  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
426 }
427 
428 void
429 Restore::restore_next(Signal* signal, FilePtr file_ptr)
430 {
431  Uint32 *data, len= 0;
432  Uint32 status = file_ptr.p->m_status;
433  Uint32 page_count = file_ptr.p->m_pages.getSize();
434  do
435  {
436  Uint32 left= file_ptr.p->m_bytes_left;
437  if(left < 8)
438  {
442  break;
443  }
444  Ptr<GlobalPage> page_ptr, next_page_ptr = { 0, 0 };
445  m_global_page_pool.getPtr(page_ptr, file_ptr.p->m_current_page_ptr_i);
446  List::Iterator it;
447 
448  Uint32 pos= file_ptr.p->m_current_page_pos;
449  if(status & File::READING_RECORDS)
450  {
454  len= ntohl(* (page_ptr.p->data + pos)) + 1;
455  }
456  else
457  {
461  if(pos + 1 == GLOBAL_PAGE_SIZE_WORDS)
462  {
468  LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
469  Uint32 next_page = file_ptr.p->m_current_page_index + 1;
470  pages.position(it, next_page % page_count);
471  m_global_page_pool.getPtr(next_page_ptr, * it.data);
472  len= ntohl(* next_page_ptr.p->data);
473  }
474  else
475  {
476  len= ntohl(* (page_ptr.p->data + pos + 1));
477  }
478  }
479 
480  if(file_ptr.p->m_status & File::FIRST_READ)
481  {
482  len= 3;
483  file_ptr.p->m_status &= ~(Uint32)File::FIRST_READ;
484  }
485 
486  if(4 * len > left)
487  {
491  ndbout_c("records: %d len: %x left: %d",
492  status & File::READING_RECORDS, 4*len, left);
493 
494  if (unlikely((status & File:: FILE_THREAD_RUNNING) == 0))
495  {
496  crash_during_restore(file_ptr, __LINE__, 0);
497  }
498  len= 0;
499  break;
500  }
501 
506  if(pos + len >= GLOBAL_PAGE_SIZE_WORDS)
507  {
511  if(next_page_ptr.p == 0)
512  {
513  LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
514  Uint32 next_page = file_ptr.p->m_current_page_index + 1;
515  pages.position(it, next_page % page_count);
516  m_global_page_pool.getPtr(next_page_ptr, * it.data);
517  }
518  file_ptr.p->m_current_page_ptr_i = next_page_ptr.i;
519  file_ptr.p->m_current_page_pos = (pos + len) - GLOBAL_PAGE_SIZE_WORDS;
520  file_ptr.p->m_current_page_index =
521  (file_ptr.p->m_current_page_index + 1) % page_count;
522 
523  Uint32 first = (GLOBAL_PAGE_SIZE_WORDS - pos);
524  // wl4391_todo removing valgrind overlap warning for now
525  memmove(page_ptr.p, page_ptr.p->data+pos, 4 * first);
526  memcpy(page_ptr.p->data+first, next_page_ptr.p, 4 * (len - first));
527  data= page_ptr.p->data;
528  }
529  else
530  {
531  file_ptr.p->m_current_page_pos = pos + len;
532  data= page_ptr.p->data+pos;
533  }
534 
535  file_ptr.p->m_bytes_left -= 4*len;
536 
537  if(status & File::READING_RECORDS)
538  {
539  if(len == 1)
540  {
541  file_ptr.p->m_status = status & ~(Uint32)File::READING_RECORDS;
542  }
543  else
544  {
545  parse_record(signal, file_ptr, data, len);
546  }
547  }
548  else
549  {
550  switch(ntohl(* data)){
551  case BackupFormat::FILE_HEADER:
552  parse_file_header(signal, file_ptr, data-3, len+3);
553  break;
554  case BackupFormat::FRAGMENT_HEADER:
555  file_ptr.p->m_status = status | File::READING_RECORDS;
556  parse_fragment_header(signal, file_ptr, data, len);
557  break;
558  case BackupFormat::FRAGMENT_FOOTER:
559  parse_fragment_footer(signal, file_ptr, data, len);
560  break;
561  case BackupFormat::TABLE_LIST:
562  parse_table_list(signal, file_ptr, data, len);
563  break;
564  case BackupFormat::TABLE_DESCRIPTION:
565  parse_table_description(signal, file_ptr, data, len);
566  break;
567  case BackupFormat::GCP_ENTRY:
568  parse_gcp_entry(signal, file_ptr, data, len);
569  break;
570  case BackupFormat::EMPTY_ENTRY:
571  // skip
572  break;
573  case 0x4e444242: // 'NDBB'
574  if (check_file_version(signal, ntohl(* (data+2))) == 0)
575  {
576  break;
577  }
578  default:
579  parse_error(signal, file_ptr, __LINE__, ntohl(* data));
580  }
581  }
582  } while(0);
583 
584  if(file_ptr.p->m_bytes_left == 0 && status & File::FILE_EOF)
585  {
586  file_ptr.p->m_status &= ~(Uint32)File::RESTORE_THREAD_RUNNING;
590  close_file(signal, file_ptr);
591  return;
592  }
593 
594  signal->theData[0] = RestoreContinueB::RESTORE_NEXT;
595  signal->theData[1] = file_ptr.i;
596 
597  if(len)
598  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
599  else
600  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
601 }
602 
603 void
604 Restore::read_file(Signal* signal, FilePtr file_ptr)
605 {
606  Uint32 left= file_ptr.p->m_bytes_left;
607  Uint32 page_count = file_ptr.p->m_pages.getSize();
608  Uint32 free= GLOBAL_PAGE_SIZE * page_count - left;
609  Uint32 read_count= free/GLOBAL_PAGE_SIZE;
610 
611  if(read_count <= file_ptr.p->m_outstanding_reads)
612  {
613  signal->theData[0] = RestoreContinueB::READ_FILE;
614  signal->theData[1] = file_ptr.i;
615  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
616  return;
617  }
618 
619  read_count -= file_ptr.p->m_outstanding_reads;
620  Uint32 curr_page= file_ptr.p->m_current_page_index;
621  LocalDataBuffer<15> pages(m_databuffer_pool, file_ptr.p->m_pages);
622 
623  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
624  req->filePointer = file_ptr.p->m_fd;
625  req->userReference = reference();
626  req->userPointer = file_ptr.i;
627  req->numberOfPages = 1;
628  req->operationFlag = 0;
629  FsReadWriteReq::setFormatFlag(req->operationFlag,
630  FsReadWriteReq::fsFormatGlobalPage);
631  FsReadWriteReq::setPartialReadFlag(req->operationFlag, 1);
632 
633  Uint32 start= (curr_page + page_count - read_count) % page_count;
634 
635  List::Iterator it;
636  pages.position(it, start);
637  do
638  {
639  file_ptr.p->m_outstanding_reads++;
640  req->varIndex = file_ptr.p->m_current_file_page++;
641  req->data.pageData[0] = *it.data;
642  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
643  FsReadWriteReq::FixedLength + 1, JBB);
644 
645  start++;
646  if(start == page_count)
647  {
648  start= 0;
649  pages.position(it, start);
650  }
651  else
652  {
653  pages.next(it);
654  }
655  } while(start != curr_page);
656 }
657 
658 void
659 Restore::execFSREADREF(Signal * signal)
660 {
661  jamEntry();
662  SimulatedBlock::execFSREADREF(signal);
663  ndbrequire(false);
664 }
665 
666 void
667 Restore::execFSREADCONF(Signal * signal)
668 {
669  jamEntry();
670  FilePtr file_ptr;
671  FsConf* conf= (FsConf*)signal->getDataPtr();
672  m_file_pool.getPtr(file_ptr, conf->userPointer);
673 
674  file_ptr.p->m_bytes_left += conf->bytes_read;
675 
676  ndbassert(file_ptr.p->m_outstanding_reads);
677  file_ptr.p->m_outstanding_reads--;
678 
679  if(file_ptr.p->m_outstanding_reads == 0)
680  {
681  ndbassert(conf->bytes_read <= GLOBAL_PAGE_SIZE);
682  if(conf->bytes_read == GLOBAL_PAGE_SIZE)
683  {
684  read_file(signal, file_ptr);
685  }
686  else
687  {
688  file_ptr.p->m_status |= File::FILE_EOF;
689  file_ptr.p->m_status &= ~(Uint32)File::FILE_THREAD_RUNNING;
690  }
691  }
692 }
693 
694 void
695 Restore::close_file(Signal* signal, FilePtr file_ptr)
696 {
697  FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
698  req->filePointer = file_ptr.p->m_fd;
699  req->userPointer = file_ptr.i;
700  req->userReference = reference();
701  req->fileFlag = 0;
702  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
703 }
704 
705 void
706 Restore::execFSCLOSEREF(Signal * signal)
707 {
708  jamEntry();
709  SimulatedBlock::execFSCLOSEREF(signal);
710  ndbrequire(false);
711 }
712 
713 void
714 Restore::execFSCLOSECONF(Signal * signal)
715 {
716  jamEntry();
717  FilePtr file_ptr;
718  FsConf* conf= (FsConf*)signal->getDataPtr();
719  m_file_pool.getPtr(file_ptr, conf->userPointer);
720 
721  file_ptr.p->m_fd = RNIL;
722 
723  if(file_ptr.p->m_outstanding_operations == 0)
724  {
725  jam();
726  restore_lcp_conf(signal, file_ptr);
727  return;
728  }
729 }
730 
731 void
732 Restore::parse_file_header(Signal* signal,
733  FilePtr file_ptr,
734  const Uint32* data, Uint32 len)
735 {
737 
738  if(memcmp(fh->Magic, "NDBBCKUP", 8) != 0)
739  {
740  parse_error(signal, file_ptr, __LINE__, *data);
741  return;
742  }
743 
744  file_ptr.p->m_lcp_version = ntohl(fh->BackupVersion);
745  if (check_file_version(signal, ntohl(fh->BackupVersion)))
746  {
747  parse_error(signal, file_ptr, __LINE__, ntohl(fh->NdbVersion));
748  return;
749  }
750  ndbassert(ntohl(fh->SectionType) == BackupFormat::FILE_HEADER);
751 
752  if(ntohl(fh->SectionLength) != len-3)
753  {
754  parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
755  return;
756  }
757 
758  if(ntohl(fh->FileType) != BackupFormat::LCP_FILE)
759  {
760  parse_error(signal, file_ptr, __LINE__, ntohl(fh->FileType));
761  return;
762  }
763 
764  if(fh->ByteOrder != 0x12345678)
765  {
766  parse_error(signal, file_ptr, __LINE__, fh->ByteOrder);
767  return;
768  }
769 }
770 
771 void
772 Restore::parse_table_list(Signal* signal, FilePtr file_ptr,
773  const Uint32 *data, Uint32 len)
774 {
777 
778  if(ntohl(fh->TableIds[0]) != file_ptr.p->m_table_id)
779  {
780  parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableIds[0]));
781  return;
782  }
783 }
784 
785 void
786 Restore::parse_table_description(Signal* signal, FilePtr file_ptr,
787  const Uint32 *data, Uint32 len)
788 {
789  bool lcp = file_ptr.p->is_lcp();
790  Uint32 disk= 0;
793 
794  LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
795 
796  SimplePropertiesLinearReader it(fh->DictTabInfo, len);
797  it.first();
798 
799  DictTabInfo::Table tmpTab; tmpTab.init();
801  stat = SimpleProperties::unpack(it, &tmpTab,
802  DictTabInfo::TableMapping,
803  DictTabInfo::TableMappingSize,
804  true, true);
805  ndbrequire(stat == SimpleProperties::Break);
806 
807  if(tmpTab.TableId != file_ptr.p->m_table_id)
808  {
809  parse_error(signal, file_ptr, __LINE__, tmpTab.TableId);
810  return;
811  }
812 
813  Column c;
814  Uint32 colstore[sizeof(Column)/sizeof(Uint32)];
815 
816  for(Uint32 i = 0; i<tmpTab.NoOfAttributes; i++) {
817  jam();
818  DictTabInfo::Attribute tmp; tmp.init();
819  stat = SimpleProperties::unpack(it, &tmp,
820  DictTabInfo::AttributeMapping,
821  DictTabInfo::AttributeMappingSize,
822  true, true);
823 
824  ndbrequire(stat == SimpleProperties::Break);
825  it.next(); // Move Past EndOfAttribute
826 
827  const Uint32 arr = tmp.AttributeArraySize;
828  const Uint32 sz = 1 << tmp.AttributeSize;
829  const Uint32 sz32 = (sz * arr + 31) >> 5;
830  const bool varsize = tmp.AttributeArrayType != NDB_ARRAYTYPE_FIXED;
831 
832  c.m_id = tmp.AttributeId;
833  c.m_size = sz32;
834  c.m_flags = (tmp.AttributeKeyFlag ? Column::COL_KEY : 0);
835  c.m_flags |= (tmp.AttributeStorageType == NDB_STORAGETYPE_DISK ?
836  Column::COL_DISK : 0);
837 
838  if(lcp && (c.m_flags & Column::COL_DISK))
839  {
844  disk++;
845  continue;
846  }
847 
848  if(!tmp.AttributeNullableFlag && !varsize)
849  {
850  }
851  else if (true) // null mask dropped in 5.1
852  {
853  c.m_flags |= (varsize ? Column::COL_VAR : 0);
854  c.m_flags |= (tmp.AttributeNullableFlag ? Column::COL_NULL : 0);
855  }
856 
857  memcpy(colstore, &c, sizeof(Column));
858  if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
859  {
860  parse_error(signal, file_ptr, __LINE__, i);
861  return;
862  }
863  }
864 
865  if(lcp)
866  {
867  if (disk)
868  {
869  c.m_id = AttributeHeader::DISK_REF;
870  c.m_size = 2;
871  c.m_flags = 0;
872  memcpy(colstore, &c, sizeof(Column));
873  if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
874  {
875  parse_error(signal, file_ptr, __LINE__, 0);
876  return;
877  }
878  }
879 
880  {
881  c.m_id = AttributeHeader::ROWID;
882  c.m_size = 2;
883  c.m_flags = 0;
884  memcpy(colstore, &c, sizeof(Column));
885  if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
886  {
887  parse_error(signal, file_ptr, __LINE__, 0);
888  return;
889  }
890  }
891 
892  if (tmpTab.RowGCIFlag)
893  {
894  c.m_id = AttributeHeader::ROW_GCI;
895  c.m_size = 2;
896  c.m_flags = 0;
897  memcpy(colstore, &c, sizeof(Column));
898  if(!columns.append(colstore, sizeof(Column)/sizeof(Uint32)))
899  {
900  parse_error(signal, file_ptr, __LINE__, 0);
901  return;
902  }
903  }
904  }
905 
906  file_ptr.p->m_table_version = tmpTab.TableVersion;
907 }
908 
909 void
910 Restore::parse_fragment_header(Signal* signal, FilePtr file_ptr,
911  const Uint32 *data, Uint32 len)
912 {
915  if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
916  {
917  parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
918  return;
919  }
920 
921  if(ntohl(fh->ChecksumType) != 0)
922  {
923  parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
924  return;
925  }
926 
927  file_ptr.p->m_fragment_id = ntohl(fh->FragmentNo);
928 
929  if(file_ptr.p->is_lcp())
930  {
934  c_tup->start_restore_lcp(file_ptr.p->m_table_id,
935  file_ptr.p->m_fragment_id);
936  }
937 }
938 
939 void
940 Restore::parse_record(Signal* signal, FilePtr file_ptr,
941  const Uint32 *data, Uint32 len)
942 {
943  List::Iterator it;
944  LocalDataBuffer<15> columns(m_databuffer_pool, file_ptr.p->m_columns);
945 
946  Uint32 * const key_start = signal->getDataPtrSend()+24;
947  Uint32 * const attr_start = key_start + MAX_KEY_SIZE_IN_WORDS;
948 
949  data += 1;
950  const Uint32* const dataStart = data;
951 
952  bool disk = false;
953  bool rowid = false;
954  bool gci = false;
955  Uint32 keyLen;
956  Uint32 attrLen;
957  Local_key rowid_val;
958  Uint64 gci_val;
959  Uint32 tableId = file_ptr.p->m_table_id;
960  const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
961 
962  if (likely(file_ptr.p->m_lcp_version >= NDBD_RAW_LCP))
963  {
964  rowid = true;
965  rowid_val.m_page_no = data[0];
966  rowid_val.m_page_idx = data[1];
967  keyLen = c_tup->read_lcp_keys(tableId, data+2, len - 3, key_start);
968 
969  AttributeHeader::init(attr_start, AttributeHeader::READ_LCP, 4*(len - 3));
970  memcpy(attr_start + 1, data + 2, 4 * (len - 3));
971  attrLen = 1 + len - 3;
972  }
973  else
974  {
975  Uint32 *keyData = key_start;
976  Uint32 *attrData = attr_start;
977  union {
978  Column c;
979  Uint32 _align[sizeof(Column)/sizeof(Uint32)];
980  };
981 
982  columns.first(it);
983  while(!it.isNull())
984  {
985  _align[0] = *it.data; ndbrequire(columns.next(it));
986  _align[1] = *it.data; columns.next(it);
987 
988  if (c.m_id == AttributeHeader::ROWID)
989  {
990  rowid_val.m_page_no = data[0];
991  rowid_val.m_page_idx = data[1];
992  data += 2;
993  rowid = true;
994  continue;
995  }
996 
997  if (c.m_id == AttributeHeader::ROW_GCI)
998  {
999  memcpy(&gci_val, data, 8);
1000  data += 2;
1001  gci = true;
1002  continue;
1003  }
1004 
1005  if (! (c.m_flags & (Column::COL_VAR | Column::COL_NULL)))
1006  {
1007  ndbrequire(data < dataStart + len);
1008 
1009  if(c.m_flags & Column::COL_KEY)
1010  {
1011  memcpy(keyData, data, 4*c.m_size);
1012  keyData += c.m_size;
1013  }
1014 
1015  AttributeHeader::init(attrData++, c.m_id, c.m_size << 2);
1016  memcpy(attrData, data, 4*c.m_size);
1017  attrData += c.m_size;
1018  data += c.m_size;
1019  }
1020 
1021  if(c.m_flags & Column::COL_DISK)
1022  disk= true;
1023  }
1024 
1025  // second part is data driven
1026  while (data + 2 < dataStart + len) {
1027  Uint32 sz= ntohl(*data); data++;
1028  Uint32 id= ntohl(*data); data++; // column_no
1029 
1030  ndbrequire(columns.position(it, 2 * id));
1031 
1032  _align[0] = *it.data; ndbrequire(columns.next(it));
1033  _align[1] = *it.data;
1034 
1035  Uint32 sz32 = (sz + 3) >> 2;
1036  ndbassert(c.m_flags & (Column::COL_VAR | Column::COL_NULL));
1037  if (c.m_flags & Column::COL_KEY)
1038  {
1039  memcpy(keyData, data, 4 * sz32);
1040  keyData += sz32;
1041  }
1042 
1043  AttributeHeader::init(attrData++, c.m_id, sz);
1044  memcpy(attrData, data, sz);
1045 
1046  attrData += sz32;
1047  data += sz32;
1048  }
1049 
1050  ndbrequire(data == dataStart + len - 1);
1051 
1052  ndbrequire(disk == false); // Not supported...
1053  ndbrequire(rowid == true);
1054  keyLen = Uint32(keyData - key_start);
1055  attrLen = Uint32(attrData - attr_start);
1056  if (desc->noOfKeyAttr != desc->noOfVarKeys)
1057  {
1058  reorder_key(desc, key_start, keyLen);
1059  }
1060  }
1061 
1062  LqhKeyReq * req = (LqhKeyReq *)signal->getDataPtrSend();
1063 
1064  Uint32 hashValue;
1065  if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
1066  hashValue = calulate_hash(tableId, key_start);
1067  else
1068  hashValue = md5_hash((Uint64*)key_start, keyLen);
1069 
1070  Uint32 tmp= 0;
1071  LqhKeyReq::setAttrLen(tmp, attrLen);
1072  req->attrLen = tmp;
1073 
1074  tmp= 0;
1075  LqhKeyReq::setKeyLen(tmp, keyLen);
1076  LqhKeyReq::setLastReplicaNo(tmp, 0);
1077  /* ---------------------------------------------------------------------- */
1078  // Indicate Application Reference is present in bit 15
1079  /* ---------------------------------------------------------------------- */
1080  LqhKeyReq::setApplicationAddressFlag(tmp, 0);
1081  LqhKeyReq::setDirtyFlag(tmp, 1);
1082  LqhKeyReq::setSimpleFlag(tmp, 1);
1083  LqhKeyReq::setOperation(tmp, ZINSERT);
1084  LqhKeyReq::setSameClientAndTcFlag(tmp, 0);
1085  LqhKeyReq::setAIInLqhKeyReq(tmp, 0);
1086  LqhKeyReq::setNoDiskFlag(tmp, disk ? 0 : 1);
1087  LqhKeyReq::setRowidFlag(tmp, 1);
1088  LqhKeyReq::setGCIFlag(tmp, gci);
1089  req->clientConnectPtr = file_ptr.i;
1090  req->hashValue = hashValue;
1091  req->requestInfo = tmp;
1092  req->tcBlockref = reference();
1093  req->savePointId = 0;
1094  req->tableSchemaVersion = file_ptr.p->m_table_id +
1095  (file_ptr.p->m_table_version << 16);
1096  req->fragmentData = file_ptr.p->m_fragment_id;
1097  req->transId1 = 0;
1098  req->transId2 = 0;
1099  req->scanInfo = 0;
1100  memcpy(req->variableData, key_start, 16);
1101  Uint32 pos = keyLen > 4 ? 4 : keyLen;
1102  req->variableData[pos++] = rowid_val.m_page_no;
1103  req->variableData[pos++] = rowid_val.m_page_idx;
1104  if (gci)
1105  req->variableData[pos++] = (Uint32)gci_val;
1106  file_ptr.p->m_outstanding_operations++;
1107  EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal,
1108  LqhKeyReq::FixedSignalLength+pos);
1109 
1110  if(keyLen > 4)
1111  {
1112  c_lqh->receive_keyinfo(signal,
1113  key_start + 4,
1114  keyLen - 4);
1115  }
1116 
1117  c_lqh->receive_attrinfo(signal, attr_start, attrLen);
1118 }
1119 
1120 void
1121 Restore::reorder_key(const KeyDescriptor* desc,
1122  Uint32 *data, Uint32 len)
1123 {
1124  Uint32 i;
1125  Uint32 *var= data;
1126  Uint32 Tmp[MAX_KEY_SIZE_IN_WORDS];
1127  for(i = 0; i<desc->noOfKeyAttr; i++)
1128  {
1129  Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1130  switch(AttributeDescriptor::getArrayType(attr)){
1131  case NDB_ARRAYTYPE_FIXED:
1132  var += AttributeDescriptor::getSizeInWords(attr);
1133  }
1134  }
1135 
1136  Uint32 *dst = Tmp;
1137  Uint32 *src = data;
1138  for(i = 0; i<desc->noOfKeyAttr; i++)
1139  {
1140  Uint32 sz;
1141  Uint32 attr = desc->keyAttr[i].attributeDescriptor;
1142  switch(AttributeDescriptor::getArrayType(attr)){
1143  case NDB_ARRAYTYPE_FIXED:
1144  sz = AttributeDescriptor::getSizeInWords(attr);
1145  memcpy(dst, src, 4 * sz);
1146  src += sz;
1147  break;
1148  case NDB_ARRAYTYPE_SHORT_VAR:
1149  sz = (1 + ((Uint8*)var)[0] + 3) >> 2;
1150  memcpy(dst, var, 4 * sz);
1151  var += sz;
1152  break;
1153  case NDB_ARRAYTYPE_MEDIUM_VAR:
1154  sz = (2 + ((Uint8*)var)[0] + 256*((Uint8*)var)[1] + 3) >> 2;
1155  memcpy(dst, var, 4 * sz);
1156  var += sz;
1157  break;
1158  default:
1159  ndbrequire(false);
1160  sz = 0;
1161  }
1162  dst += sz;
1163  }
1164  ndbassert((Uint32) (dst - Tmp) == len);
1165  memcpy(data, Tmp, 4*len);
1166 }
1167 
1168 Uint32
1169 Restore::calulate_hash(Uint32 tableId, const Uint32 *src)
1170 {
1171  jam();
1172  Uint64 Tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
1173  Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
1174  Uint32 keyLen = xfrm_key(tableId, src, (Uint32*)Tmp, sizeof(Tmp) >> 2,
1175  keyPartLen);
1176  ndbrequire(keyLen);
1177 
1178  return md5_hash(Tmp, keyLen);
1179 }
1180 
1181 void
1182 Restore::execLQHKEYREF(Signal* signal)
1183 {
1184  FilePtr file_ptr;
1185  LqhKeyRef* ref = (LqhKeyRef*)signal->getDataPtr();
1186  m_file_pool.getPtr(file_ptr, ref->connectPtr);
1187 
1188  crash_during_restore(file_ptr, __LINE__, ref->errorCode);
1189  ndbrequire(false);
1190 }
1191 
1192 void
1193 Restore::crash_during_restore(FilePtr file_ptr, Uint32 line, Uint32 errCode)
1194 {
1195  char buf[255], name[100];
1196  BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1197  file_ptr.p->m_lcp_no,
1198  file_ptr.p->m_table_id,
1199  file_ptr.p->m_fragment_id);
1200 
1201  if (errCode)
1202  {
1203  BaseString::snprintf(buf, sizeof(buf),
1204  "Error %d (line: %u) during restore of %s",
1205  errCode, line, name);
1206  }
1207  else
1208  {
1209  BaseString::snprintf(buf, sizeof(buf),
1210  "Error (line %u) during restore of %s",
1211  line, name);
1212  }
1213  progError(__LINE__, NDBD_EXIT_INVALID_LCP_FILE, buf);
1214 }
1215 
1216 void
1217 Restore::execLQHKEYCONF(Signal* signal)
1218 {
1219  FilePtr file_ptr;
1220  LqhKeyConf * conf = (LqhKeyConf *)signal->getDataPtr();
1221  m_file_pool.getPtr(file_ptr, conf->opPtr);
1222 
1223  ndbassert(file_ptr.p->m_outstanding_operations);
1224  file_ptr.p->m_outstanding_operations--;
1225  file_ptr.p->m_rows_restored++;
1226  if(file_ptr.p->m_outstanding_operations == 0 && file_ptr.p->m_fd == RNIL)
1227  {
1228  jam();
1229  restore_lcp_conf(signal, file_ptr);
1230  return;
1231  }
1232 }
1233 
1234 void
1235 Restore::restore_lcp_conf(Signal* signal, FilePtr file_ptr)
1236 {
1237  RestoreLcpConf* rep= (RestoreLcpConf*)signal->getDataPtrSend();
1238  rep->senderData= file_ptr.p->m_sender_data;
1239  if(file_ptr.p->is_lcp())
1240  {
1246  c_tup->complete_restore_lcp(signal,
1247  file_ptr.p->m_sender_ref,
1248  file_ptr.p->m_sender_data,
1249  file_ptr.p->m_table_id,
1250  file_ptr.p->m_fragment_id);
1251  }
1252  else
1253  {
1254  sendSignal(file_ptr.p->m_sender_ref,
1255  GSN_RESTORE_LCP_CONF, signal,
1256  RestoreLcpConf::SignalLength, JBB);
1257  }
1258 
1259  signal->theData[0] = NDB_LE_ReadLCPComplete;
1260  signal->theData[1] = file_ptr.p->m_table_id;
1261  signal->theData[2] = file_ptr.p->m_fragment_id;
1262  signal->theData[3] = Uint32(file_ptr.p->m_rows_restored >> 32);
1263  signal->theData[4] = Uint32(file_ptr.p->m_rows_restored);
1264  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
1265 
1266  release_file(file_ptr);
1267 }
1268 
1269 void
1270 Restore::parse_fragment_footer(Signal* signal, FilePtr file_ptr,
1271  const Uint32 *data, Uint32 len)
1272 {
1275  if(ntohl(fh->TableId) != file_ptr.p->m_table_id)
1276  {
1277  parse_error(signal, file_ptr, __LINE__, ntohl(fh->TableId));
1278  return;
1279  }
1280 
1281  if(ntohl(fh->Checksum) != 0)
1282  {
1283  parse_error(signal, file_ptr, __LINE__, ntohl(fh->SectionLength));
1284  return;
1285  }
1286 }
1287 
1288 void
1289 Restore::parse_gcp_entry(Signal* signal, FilePtr file_ptr,
1290  const Uint32 *data, Uint32 len)
1291 {
1292 
1293 }
1294 
1295 void
1296 Restore::parse_error(Signal* signal,
1297  FilePtr file_ptr, Uint32 line, Uint32 extra)
1298 {
1299  char buf[255], name[100];
1300  BaseString::snprintf(name, sizeof(name), "%u/T%dF%d",
1301  file_ptr.p->m_lcp_no,
1302  file_ptr.p->m_table_id,
1303  file_ptr.p->m_fragment_id);
1304 
1305  BaseString::snprintf(buf, sizeof(buf),
1306  "Parse error in file: %s, extra: %d",
1307  name, extra);
1308 
1309  progError(line, NDBD_EXIT_INVALID_LCP_FILE, buf);
1310  ndbrequire(false);
1311 }
1312 
1313 NdbOut&
1314 operator << (NdbOut& ndbout, const Restore::Column& col)
1315 {
1316  ndbout << "[ Col: id: " << col.m_id
1317  << " size: " << col.m_size
1318  << " key: " << (Uint32)(col.m_flags & Restore::Column::COL_KEY)
1319  << " variable: " << (Uint32)(col.m_flags & Restore::Column::COL_VAR)
1320  << " null: " << (Uint32)(col.m_flags & Restore::Column::COL_NULL)
1321  << " disk: " << (Uint32)(col.m_flags & Restore::Column::COL_DISK)
1322  << "]";
1323 
1324  return ndbout;
1325 }
1326 
1327 int
1328 Restore::check_file_version(Signal* signal, Uint32 file_version)
1329 {
1330  if (file_version < MAKE_VERSION(5,1,6))
1331  {
1332  char buf[255];
1333  char verbuf[255];
1334  ndbGetVersionString(file_version, 0, 0, verbuf, sizeof(verbuf));
1335  BaseString::snprintf(buf, sizeof(buf),
1336  "Unsupported version of LCP files found on disk, "
1337  " found: %s", verbuf);
1338 
1339  progError(__LINE__,
1340  NDBD_EXIT_SR_RESTARTCONFLICT,
1341  buf);
1342  return -1;
1343  }
1344  return 0;
1345 }