18 package com.mysql.clusterj.tie;
20 import java.util.ArrayList;
21 import java.util.List;
23 import com.mysql.clusterj.ClusterJDatastoreException;
24 import com.mysql.clusterj.ClusterJFatalInternalException;
25 import com.mysql.clusterj.LockMode;
27 import com.mysql.clusterj.core.store.ClusterTransaction;
28 import com.mysql.clusterj.core.store.Index;
29 import com.mysql.clusterj.core.store.IndexOperation;
30 import com.mysql.clusterj.core.store.IndexScanOperation;
31 import com.mysql.clusterj.core.store.Operation;
32 import com.mysql.clusterj.core.store.PartitionKey;
33 import com.mysql.clusterj.core.store.ScanOperation;
34 import com.mysql.clusterj.core.store.Table;
36 import com.mysql.clusterj.core.util.I18NHelper;
37 import com.mysql.clusterj.core.util.Logger;
38 import com.mysql.clusterj.core.util.LoggerFactoryService;
39 import com.mysql.clusterj.tie.DbImpl.BufferManager;
41 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
42 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
43 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
44 import com.mysql.ndbjtie.ndbapi.NdbOperation;
45 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
46 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
47 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
48 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
49 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
50 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
51 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
56 class ClusterTransactionImpl
implements ClusterTransaction {
59 static final I18NHelper local = I18NHelper
60 .getInstance(ClusterTransactionImpl.class);
63 static final Logger logger = LoggerFactoryService.getFactory()
64 .getInstance(ClusterTransactionImpl.class);
67 private List<Runnable> postExecuteCallbacks =
new ArrayList<Runnable>();
73 protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance();
76 private Dictionary ndbDictionary;
79 private String coordinatedTransactionId = null;
82 private static boolean supportsGetCoordinatedTransactionId =
true;
85 private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
88 private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
91 private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
94 private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
97 private boolean autocommit =
false;
100 private boolean autocommitted =
false;
103 private String joinTransactionId;
105 private BufferManager bufferManager;
107 public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary,
String joinTransactionId) {
109 this.ndbDictionary = ndbDictionary;
110 this.joinTransactionId = joinTransactionId;
111 this.bufferManager = db.getBufferManager();
114 public void close() {
115 if (ndbTransaction != null) {
116 ndbTransaction.close();
117 ndbTransaction = null;
121 public void executeCommit() {
122 executeCommit(
true,
true);
125 public boolean isEnlisted() {
126 return ndbTransaction != null;
134 private void enlist() {
135 if (ndbTransaction == null) {
136 if (coordinatedTransactionId != null) {
137 ndbTransaction = db.joinTransaction(coordinatedTransactionId);
139 ndbTransaction = partitionKey.enlist(db);
140 getCoordinatedTransactionId(db);
145 public void executeCommit(
boolean abort,
boolean force) {
146 if (logger.isTraceEnabled()) logger.trace(
"");
148 if (isEnlisted() && !autocommitted) {
149 handlePendingPostExecuteCallbacks();
150 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
151 int forceOption = force?1:0;
153 abortOption, forceOption);
154 handleError(returnCode, ndbTransaction);
156 autocommitted =
false;
160 public void executeNoCommit() {
161 executeNoCommit(
true,
true);
164 public void executeNoCommit(
boolean abort,
boolean force) {
165 if (logger.isTraceEnabled()) logger.trace(
"");
170 if (autocommit && postExecuteCallbacks.size() == 0) {
172 executeCommit(abort, force);
173 autocommitted =
true;
176 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
177 int forceOption = force?1:0;
179 abortOption, forceOption);
180 handleError(returnCode, ndbTransaction);
181 performPostExecuteCallbacks();
184 public void executeRollback() {
189 int abortOption = AbortOption.AO_IgnoreError;
192 abortOption, forceOption);
193 handleError(returnCode, ndbTransaction);
196 public Operation getDeleteOperation(Table storeTable) {
198 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
199 handleError(ndbTable, ndbDictionary);
200 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
201 handleError(ndbOperation, ndbTransaction);
203 handleError(returnCode, ndbTransaction);
204 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());;
205 return new OperationImpl(ndbOperation,
this);
208 public Operation getInsertOperation(Table storeTable) {
210 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
211 handleError(ndbTable, ndbDictionary);
212 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
213 handleError(ndbOperation, ndbTransaction);
215 handleError(returnCode, ndbTransaction);
216 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
217 return new OperationImpl(ndbOperation,
this);
220 public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
222 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
223 handleError(ndbIndex, ndbDictionary);
225 handleError(ndbOperation, ndbTransaction);
226 int lockMode = indexScanLockMode;
230 int returnCode = ndbOperation.
readTuples(lockMode, scanFlags, parallel, batch);
231 handleError(returnCode, ndbTransaction);
232 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName() +
" index: " + storeIndex.getName());
233 return new IndexScanOperationImpl(storeTable, ndbOperation,
this);
236 public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
238 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
239 handleError(ndbIndex, ndbDictionary);
241 handleError(ndbOperation, ndbTransaction);
242 int lockMode = indexScanLockMode;
243 int scanFlags = ScanFlag.SF_MultiRange;
246 int returnCode = ndbOperation.
readTuples(lockMode, scanFlags, parallel, batch);
247 handleError(returnCode, ndbTransaction);
248 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName() +
" index: " + storeIndex.getName());
249 return new IndexScanOperationImpl(storeTable, ndbOperation,
this);
252 public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) {
254 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
255 handleError(ndbIndex, ndbDictionary);
257 handleError(ndbOperation, ndbTransaction);
258 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
259 int scanFlags = ScanFlag.SF_KeyInfo;
262 int returnCode = ndbOperation.
readTuples(lockMode, scanFlags, parallel, batch);
263 handleError(returnCode, ndbTransaction);
264 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName() +
" index: " + storeIndex.getName());
265 return new IndexScanOperationImpl(storeTable, ndbOperation,
this);
268 public Operation getSelectOperation(Table storeTable) {
270 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
271 handleError(ndbTable, ndbDictionary);
272 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
273 handleError(ndbOperation, ndbTransaction);
274 int lockMode = findLockMode;
275 int returnCode = ndbOperation.
readTuple(lockMode);
276 handleError(returnCode, ndbTransaction);
277 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
278 return new OperationImpl(storeTable, ndbOperation,
this);
281 public ScanOperation getTableScanOperation(Table storeTable) {
283 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
284 handleError(ndbTable, ndbDictionary);
285 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
286 handleError(ndbScanOperation, ndbTransaction);
287 int lockMode = tableScanLockMode;
291 int returnCode = ndbScanOperation.
readTuples(lockMode, scanFlags, parallel, batch);
292 handleError(returnCode, ndbTransaction);
293 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
294 return new ScanOperationImpl(storeTable, ndbScanOperation,
this);
297 public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) {
299 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
300 handleError(ndbTable, ndbDictionary);
301 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
302 handleError(ndbScanOperation, ndbTransaction);
303 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
304 int scanFlags = ScanFlag.SF_KeyInfo;
307 int returnCode = ndbScanOperation.
readTuples(lockMode, scanFlags, parallel, batch);
308 handleError(returnCode, ndbTransaction);
309 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
310 return new ScanOperationImpl(storeTable, ndbScanOperation,
this);
313 public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
315 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
316 handleError(ndbIndex, ndbDictionary);
317 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
318 handleError(ndbIndexOperation, ndbTransaction);
319 int lockMode = lookupLockMode;
320 int returnCode = ndbIndexOperation.
readTuple(lockMode);
321 handleError(returnCode, ndbTransaction);
322 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName() +
" index: " + storeIndex.getName());
323 return new IndexOperationImpl(storeTable, ndbIndexOperation,
this);
326 public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
328 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
329 handleError(ndbIndex, ndbDictionary);
330 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
331 handleError(ndbIndexOperation, ndbTransaction);
333 handleError(returnCode, ndbTransaction);
334 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName() +
" index: " + storeIndex.getName());
335 return new IndexOperationImpl(storeTable, ndbIndexOperation,
this);
338 public Operation getUpdateOperation(Table storeTable) {
340 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
341 handleError(ndbTable, ndbDictionary);
342 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
343 handleError(ndbOperation, ndbTransaction);
345 handleError(returnCode, ndbTransaction);
346 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
347 return new OperationImpl(storeTable, ndbOperation,
this);
350 public Operation getWriteOperation(Table storeTable) {
352 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
353 handleError(ndbTable, ndbDictionary);
354 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
355 handleError(ndbOperation, ndbTransaction);
357 handleError(returnCode, ndbTransaction);
358 if (logger.isTraceEnabled()) logger.trace(
"Table: " + storeTable.getName());
359 return new OperationImpl(storeTable, ndbOperation,
this);
362 public void postExecuteCallback(Runnable callback) {
363 postExecuteCallbacks.add(callback);
366 private void clearPostExecuteCallbacks() {
367 postExecuteCallbacks.clear();
370 private void handlePendingPostExecuteCallbacks() {
372 if (!postExecuteCallbacks.isEmpty()) {
377 private void performPostExecuteCallbacks() {
381 for (Runnable runnable: postExecuteCallbacks) {
384 }
catch (Throwable t) {
385 throw new ClusterJDatastoreException(
386 local.message(
"ERR_Datastore"), t);
390 clearPostExecuteCallbacks();
398 protected void handleError(
int returnCode) {
399 if (returnCode == -1) {
400 NdbErrorConst ndbError = ndbTransaction.getNdbError();
401 String detail = db.getNdbErrorDetail(ndbError);
402 Utility.throwError(returnCode, ndbError, detail);
406 protected void handleError(
int returnCode,
NdbTransaction ndbTransaction) {
407 if (returnCode == 0) {
410 NdbErrorConst ndbError = ndbTransaction.
getNdbError();
411 if (ndbError.code() == 0) {
414 String detail = db.getNdbErrorDetail(ndbError);
415 Utility.throwError(returnCode, ndbError, detail);
419 protected void handleError(Object
object,
NdbTransaction ndbTransaction) {
420 if (
object != null) {
423 NdbErrorConst ndbError = ndbTransaction.
getNdbError();
424 String detail = db.getNdbErrorDetail(ndbError);
425 Utility.throwError(null, ndbError, detail);
429 protected void handleError(Object
object, Dictionary ndbDictionary) {
430 if (
object != null) {
433 NdbErrorConst ndbError = ndbDictionary.getNdbError();
434 String detail = db.getNdbErrorDetail(ndbError);
435 Utility.throwError(null, ndbError, detail);
439 public void setPartitionKey(PartitionKey partitionKey) {
440 if (partitionKey == null) {
441 throw new ClusterJFatalInternalException(
442 local.message(
"ERR_Partition_Key_Null"));
444 this.partitionKey = (PartitionKeyImpl)partitionKey;
447 public String getCoordinatedTransactionId() {
448 return coordinatedTransactionId;
460 private void getCoordinatedTransactionId(DbImpl db) {
462 if (supportsGetCoordinatedTransactionId) {
467 if (logger.isDetailEnabled()) logger.detail(
"CoordinatedTransactionId: "
468 + coordinatedTransactionId);
469 throw new ClusterJFatalInternalException(
"Not Implemented");
471 }
catch (Throwable t) {
473 supportsGetCoordinatedTransactionId =
false;
477 public void setCoordinatedTransactionId(
String coordinatedTransactionId) {
478 this.coordinatedTransactionId = coordinatedTransactionId;
481 public void setLockMode(LockMode lockmode) {
482 findLockMode = translateLockMode(lockmode);
483 lookupLockMode = findLockMode;
484 indexScanLockMode = findLockMode;
485 tableScanLockMode = findLockMode;
488 private int translateLockMode(LockMode lockmode) {
491 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
493 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read;
495 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
497 throw new ClusterJFatalInternalException(local.message(
"ERR_Unknown_Lock_Mode", lockmode));
501 public void setAutocommit(
boolean autocommit) {
502 this.autocommit = autocommit;
505 public BufferManager getBufferManager() {
506 return bufferManager;