MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ClusterTransactionImpl.java
1 /*
2  * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; version 2 of the License.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16  */
17 
18 package com.mysql.clusterj.tie;
19 
20 import java.util.ArrayList;
21 import java.util.List;
22 
23 import com.mysql.clusterj.ClusterJDatastoreException;
24 import com.mysql.clusterj.ClusterJFatalInternalException;
25 import com.mysql.clusterj.LockMode;
26 
27 import com.mysql.clusterj.core.store.ClusterTransaction;
28 import com.mysql.clusterj.core.store.Index;
29 import com.mysql.clusterj.core.store.IndexOperation;
30 import com.mysql.clusterj.core.store.IndexScanOperation;
31 import com.mysql.clusterj.core.store.Operation;
32 import com.mysql.clusterj.core.store.PartitionKey;
33 import com.mysql.clusterj.core.store.ScanOperation;
34 import com.mysql.clusterj.core.store.Table;
35 
36 import com.mysql.clusterj.core.util.I18NHelper;
37 import com.mysql.clusterj.core.util.Logger;
38 import com.mysql.clusterj.core.util.LoggerFactoryService;
39 import com.mysql.clusterj.tie.DbImpl.BufferManager;
40 
41 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
42 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
43 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
44 import com.mysql.ndbjtie.ndbapi.NdbOperation;
45 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
46 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
47 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
48 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
49 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
50 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
51 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
52 
56 class ClusterTransactionImpl implements ClusterTransaction {
57 
59  static final I18NHelper local = I18NHelper
60  .getInstance(ClusterTransactionImpl.class);
61 
63  static final Logger logger = LoggerFactoryService.getFactory()
64  .getInstance(ClusterTransactionImpl.class);
65 
66  protected NdbTransaction ndbTransaction;
67  private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
68 
70  protected DbImpl db;
71 
73  protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance();
74 
76  private Dictionary ndbDictionary;
77 
79  private String coordinatedTransactionId = null;
80 
82  private static boolean supportsGetCoordinatedTransactionId = true;
83 
85  private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
86 
88  private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
89 
91  private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
92 
94  private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
95 
97  private boolean autocommit = false;
98 
100  private boolean autocommitted = false;
101 
103  private String joinTransactionId;
104 
105  private BufferManager bufferManager;
106 
107  public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
108  this.db = db;
109  this.ndbDictionary = ndbDictionary;
110  this.joinTransactionId = joinTransactionId;
111  this.bufferManager = db.getBufferManager();
112  }
113 
114  public void close() {
115  if (ndbTransaction != null) {
116  ndbTransaction.close();
117  ndbTransaction = null;
118  }
119  }
120 
121  public void executeCommit() {
122  executeCommit(true, true);
123  }
124 
125  public boolean isEnlisted() {
126  return ndbTransaction != null;
127  }
128 
134  private void enlist() {
135  if (ndbTransaction == null) {
136  if (coordinatedTransactionId != null) {
137  ndbTransaction = db.joinTransaction(coordinatedTransactionId);
138  } else {
139  ndbTransaction = partitionKey.enlist(db);
140  getCoordinatedTransactionId(db);
141  }
142  }
143  }
144 
145  public void executeCommit(boolean abort, boolean force) {
146  if (logger.isTraceEnabled()) logger.trace("");
147  // nothing to do if no ndbTransaction was ever enlisted or already autocommitted
148  if (isEnlisted() && !autocommitted) {
149  handlePendingPostExecuteCallbacks();
150  int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
151  int forceOption = force?1:0;
152  int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Commit,
153  abortOption, forceOption);
154  handleError(returnCode, ndbTransaction);
155  }
156  autocommitted = false;
157  autocommit = false;
158  }
159 
160  public void executeNoCommit() {
161  executeNoCommit(true, true);
162  }
163 
164  public void executeNoCommit(boolean abort, boolean force) {
165  if (logger.isTraceEnabled()) logger.trace("");
166  if (!isEnlisted()) {
167  // nothing to do if no ndbTransaction was ever enlisted
168  return;
169  }
170  if (autocommit && postExecuteCallbacks.size() == 0) {
171  // optimization to commit now because no blob columns
172  executeCommit(abort, force);
173  autocommitted = true;
174  return;
175  }
176  int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
177  int forceOption = force?1:0;
178  int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.NoCommit,
179  abortOption, forceOption);
180  handleError(returnCode, ndbTransaction);
181  performPostExecuteCallbacks();
182  }
183 
184  public void executeRollback() {
185  if (!isEnlisted()) {
186  // nothing to do if no ndbTransaction was ever enlisted
187  return;
188  }
189  int abortOption = AbortOption.AO_IgnoreError;
190  int forceOption = 1;
191  int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Rollback,
192  abortOption, forceOption);
193  handleError(returnCode, ndbTransaction);
194  }
195 
196  public Operation getDeleteOperation(Table storeTable) {
197  enlist();
198  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
199  handleError(ndbTable, ndbDictionary);
200  NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
201  handleError(ndbOperation, ndbTransaction);
202  int returnCode = ndbOperation.deleteTuple();
203  handleError(returnCode, ndbTransaction);
204  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());;
205  return new OperationImpl(ndbOperation, this);
206  }
207 
208  public Operation getInsertOperation(Table storeTable) {
209  enlist();
210  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
211  handleError(ndbTable, ndbDictionary);
212  NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
213  handleError(ndbOperation, ndbTransaction);
214  int returnCode = ndbOperation.insertTuple();
215  handleError(returnCode, ndbTransaction);
216  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
217  return new OperationImpl(ndbOperation, this);
218  }
219 
220  public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
221  enlist();
222  IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
223  handleError(ndbIndex, ndbDictionary);
224  NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
225  handleError(ndbOperation, ndbTransaction);
226  int lockMode = indexScanLockMode;
227  int scanFlags = 0;
228  int parallel = 0;
229  int batch = 0;
230  int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
231  handleError(returnCode, ndbTransaction);
232  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
233  return new IndexScanOperationImpl(storeTable, ndbOperation, this);
234  }
235 
236  public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
237  enlist();
238  IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
239  handleError(ndbIndex, ndbDictionary);
240  NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
241  handleError(ndbOperation, ndbTransaction);
242  int lockMode = indexScanLockMode;
243  int scanFlags = ScanFlag.SF_MultiRange;
244  int parallel = 0;
245  int batch = 0;
246  int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
247  handleError(returnCode, ndbTransaction);
248  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
249  return new IndexScanOperationImpl(storeTable, ndbOperation, this);
250  }
251 
252  public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) {
253  enlist();
254  IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
255  handleError(ndbIndex, ndbDictionary);
256  NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
257  handleError(ndbOperation, ndbTransaction);
258  int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
259  int scanFlags = ScanFlag.SF_KeyInfo;
260  int parallel = 0;
261  int batch = 0;
262  int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
263  handleError(returnCode, ndbTransaction);
264  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
265  return new IndexScanOperationImpl(storeTable, ndbOperation, this);
266  }
267 
268  public Operation getSelectOperation(Table storeTable) {
269  enlist();
270  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
271  handleError(ndbTable, ndbDictionary);
272  NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
273  handleError(ndbOperation, ndbTransaction);
274  int lockMode = findLockMode;
275  int returnCode = ndbOperation.readTuple(lockMode);
276  handleError(returnCode, ndbTransaction);
277  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
278  return new OperationImpl(storeTable, ndbOperation, this);
279  }
280 
281  public ScanOperation getTableScanOperation(Table storeTable) {
282  enlist();
283  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
284  handleError(ndbTable, ndbDictionary);
285  NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
286  handleError(ndbScanOperation, ndbTransaction);
287  int lockMode = tableScanLockMode;
288  int scanFlags = 0;
289  int parallel = 0;
290  int batch = 0;
291  int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
292  handleError(returnCode, ndbTransaction);
293  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
294  return new ScanOperationImpl(storeTable, ndbScanOperation, this);
295  }
296 
297  public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) {
298  enlist();
299  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
300  handleError(ndbTable, ndbDictionary);
301  NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
302  handleError(ndbScanOperation, ndbTransaction);
303  int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
304  int scanFlags = ScanFlag.SF_KeyInfo;
305  int parallel = 0;
306  int batch = 0;
307  int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
308  handleError(returnCode, ndbTransaction);
309  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
310  return new ScanOperationImpl(storeTable, ndbScanOperation, this);
311  }
312 
313  public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
314  enlist();
315  IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
316  handleError(ndbIndex, ndbDictionary);
317  NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
318  handleError(ndbIndexOperation, ndbTransaction);
319  int lockMode = lookupLockMode;
320  int returnCode = ndbIndexOperation.readTuple(lockMode);
321  handleError(returnCode, ndbTransaction);
322  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
323  return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
324  }
325 
326  public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
327  enlist();
328  IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
329  handleError(ndbIndex, ndbDictionary);
330  NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
331  handleError(ndbIndexOperation, ndbTransaction);
332  int returnCode = ndbIndexOperation.deleteTuple();
333  handleError(returnCode, ndbTransaction);
334  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
335  return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
336  }
337 
338  public Operation getUpdateOperation(Table storeTable) {
339  enlist();
340  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
341  handleError(ndbTable, ndbDictionary);
342  NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
343  handleError(ndbOperation, ndbTransaction);
344  int returnCode = ndbOperation.updateTuple();
345  handleError(returnCode, ndbTransaction);
346  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
347  return new OperationImpl(storeTable, ndbOperation, this);
348  }
349 
350  public Operation getWriteOperation(Table storeTable) {
351  enlist();
352  TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
353  handleError(ndbTable, ndbDictionary);
354  NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
355  handleError(ndbOperation, ndbTransaction);
356  int returnCode = ndbOperation.writeTuple();
357  handleError(returnCode, ndbTransaction);
358  if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
359  return new OperationImpl(storeTable, ndbOperation, this);
360  }
361 
362  public void postExecuteCallback(Runnable callback) {
363  postExecuteCallbacks.add(callback);
364  }
365 
366  private void clearPostExecuteCallbacks() {
367  postExecuteCallbacks.clear();
368  }
369 
370  private void handlePendingPostExecuteCallbacks() {
371  // if any pending postExecuteCallbacks, flush via executeNoCommit
372  if (!postExecuteCallbacks.isEmpty()) {
373  executeNoCommit();
374  }
375  }
376 
377  private void performPostExecuteCallbacks() {
378  // TODO this will abort on the first postExecute failure
379  // TODO should this set rollback only?
380  try {
381  for (Runnable runnable: postExecuteCallbacks) {
382  try {
383  runnable.run();
384  } catch (Throwable t) {
385  throw new ClusterJDatastoreException(
386  local.message("ERR_Datastore"), t);
387  }
388  }
389  } finally {
390  clearPostExecuteCallbacks();
391  }
392  }
393 
398  protected void handleError(int returnCode) {
399  if (returnCode == -1) {
400  NdbErrorConst ndbError = ndbTransaction.getNdbError();
401  String detail = db.getNdbErrorDetail(ndbError);
402  Utility.throwError(returnCode, ndbError, detail);
403  }
404  }
405 
406  protected void handleError(int returnCode, NdbTransaction ndbTransaction) {
407  if (returnCode == 0) {
408  return;
409  } else {
410  NdbErrorConst ndbError = ndbTransaction.getNdbError();
411  if (ndbError.code() == 0) {
412  return;
413  }
414  String detail = db.getNdbErrorDetail(ndbError);
415  Utility.throwError(returnCode, ndbError, detail);
416  }
417  }
418 
419  protected void handleError(Object object, NdbTransaction ndbTransaction) {
420  if (object != null) {
421  return;
422  } else {
423  NdbErrorConst ndbError = ndbTransaction.getNdbError();
424  String detail = db.getNdbErrorDetail(ndbError);
425  Utility.throwError(null, ndbError, detail);
426  }
427  }
428 
429  protected void handleError(Object object, Dictionary ndbDictionary) {
430  if (object != null) {
431  return;
432  } else {
433  NdbErrorConst ndbError = ndbDictionary.getNdbError();
434  String detail = db.getNdbErrorDetail(ndbError);
435  Utility.throwError(null, ndbError, detail);
436  }
437  }
438 
439  public void setPartitionKey(PartitionKey partitionKey) {
440  if (partitionKey == null) {
441  throw new ClusterJFatalInternalException(
442  local.message("ERR_Partition_Key_Null"));
443  }
444  this.partitionKey = (PartitionKeyImpl)partitionKey;
445  }
446 
447  public String getCoordinatedTransactionId() {
448  return coordinatedTransactionId;
449  }
450 
460  private void getCoordinatedTransactionId(DbImpl db) {
461  try {
462  if (supportsGetCoordinatedTransactionId) {
463 // not implemented quite yet...
464 // ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
465 // coordinatedTransactionId = ndbTransaction.
466 // getCoordinatedTransactionId(buffer, buffer.capacity());
467  if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
468  + coordinatedTransactionId);
469  throw new ClusterJFatalInternalException("Not Implemented");
470  }
471  } catch (Throwable t) {
472  // oops, don't do this again
473  supportsGetCoordinatedTransactionId = false;
474  }
475  }
476 
477  public void setCoordinatedTransactionId(String coordinatedTransactionId) {
478  this.coordinatedTransactionId = coordinatedTransactionId;
479  }
480 
481  public void setLockMode(LockMode lockmode) {
482  findLockMode = translateLockMode(lockmode);
483  lookupLockMode = findLockMode;
484  indexScanLockMode = findLockMode;
485  tableScanLockMode = findLockMode;
486  }
487 
488  private int translateLockMode(LockMode lockmode) {
489  switch(lockmode) {
490  case READ_COMMITTED:
491  return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
492  case SHARED:
493  return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read;
494  case EXCLUSIVE:
495  return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
496  default:
497  throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Lock_Mode", lockmode));
498  }
499  }
500 
501  public void setAutocommit(boolean autocommit) {
502  this.autocommit = autocommit;
503  }
504 
505  public BufferManager getBufferManager() {
506  return bufferManager;
507  }
508 
509 }