MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SessionImpl.java
1 /*
2  Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 package com.mysql.clusterj.core;
19 
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalInternalException;
22 import com.mysql.clusterj.ClusterJUserException;
23 import com.mysql.clusterj.DynamicObject;
24 import com.mysql.clusterj.LockMode;
25 import com.mysql.clusterj.Query;
26 import com.mysql.clusterj.Transaction;
27 
28 import com.mysql.clusterj.core.spi.DomainTypeHandler;
29 import com.mysql.clusterj.core.spi.ValueHandler;
30 
31 import com.mysql.clusterj.core.query.QueryDomainTypeImpl;
32 import com.mysql.clusterj.core.query.QueryBuilderImpl;
33 import com.mysql.clusterj.core.query.QueryImpl;
34 
35 import com.mysql.clusterj.core.spi.SessionSPI;
36 
37 import com.mysql.clusterj.core.store.ClusterTransaction;
38 import com.mysql.clusterj.core.store.Db;
39 import com.mysql.clusterj.core.store.Dictionary;
40 import com.mysql.clusterj.core.store.Index;
41 import com.mysql.clusterj.core.store.IndexOperation;
42 import com.mysql.clusterj.core.store.IndexScanOperation;
43 import com.mysql.clusterj.core.store.Operation;
44 import com.mysql.clusterj.core.store.PartitionKey;
45 import com.mysql.clusterj.core.store.ResultData;
46 import com.mysql.clusterj.core.store.ScanOperation;
47 import com.mysql.clusterj.core.store.Table;
48 
49 import com.mysql.clusterj.core.util.I18NHelper;
50 import com.mysql.clusterj.core.util.Logger;
51 import com.mysql.clusterj.core.util.LoggerFactoryService;
52 
53 import com.mysql.clusterj.query.QueryBuilder;
54 import com.mysql.clusterj.query.QueryDefinition;
55 import com.mysql.clusterj.query.QueryDomainType;
56 
57 import java.util.ArrayList;
58 import java.util.BitSet;
59 import java.util.Collections;
60 import java.util.Iterator;
61 import java.util.List;
62 import java.util.Map;
63 
68 public class SessionImpl implements SessionSPI, CacheManager, StoreManager {
69 
71  static final I18NHelper local = I18NHelper.getInstance(SessionImpl.class);
72 
74  static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionImpl.class);
75 
78 
80  protected Db db;
81 
84 
87 
89  protected PartitionKey partitionKey = null;
90 
92  protected boolean rollbackOnly = false;
93 
96 
98  protected String joinTransactionId = null;
99 
101  protected Map properties;
102 
104  protected final int RESULT_READY = 0;
105  protected final int SCAN_FINISHED = 1;
106  protected final int CACHE_EMPTY = 2;
107 
109  protected List<StateManager> changeList = new ArrayList<StateManager>();
110 
114  protected List<Runnable> postExecuteOperations = new ArrayList<Runnable>();
115 
118 
120  private ClusterJException transactionException;
121 
123  protected int nestedAutoTransactionCounter = 0;
124 
126  // TODO get this from properties
127  protected int numberOfRetries = 5;
128 
130  private LockMode lockmode = LockMode.READ_COMMITTED;
131 
133  private Runnable postExecuteCallbackHandler = new Runnable() {
134  public void run() {
135  for (Runnable postExecuteCallback: postExecuteOperations) {
136  postExecuteCallback.run();
137  }
138  postExecuteOperations.clear();
139  }
140  };
141 
146  this.factory = factory;
147  this.db = db;
148  this.dictionary = dictionary;
149  this.properties = properties;
150  transactionImpl = new TransactionImpl(this);
152  }
153 
159  public <T> Query<T> createQuery(QueryDefinition<T> qd) {
160  if (!(qd instanceof QueryDomainTypeImpl)) {
161  throw new ClusterJUserException(
162  local.message("ERR_Exception_On_Method", "createQuery"));
163  }
164  return new QueryImpl<T>(this, (QueryDomainTypeImpl<T>)qd);
165  }
166 
175  public <T> T find(Class<T> cls, Object key) {
176  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
177  T instance = (T) factory.newInstance(cls, dictionary);
178  ValueHandler keyHandler = domainTypeHandler.createKeyValueHandler(key);
179  ValueHandler instanceHandler = domainTypeHandler.getValueHandler(instance);
180  // initialize from the database using the key
181  return (T) initializeFromDatabase(
182  domainTypeHandler, instance, instanceHandler, keyHandler);
183  }
184 
200  public <T> T initializeFromDatabase(DomainTypeHandler<T> domainTypeHandler,
201  T instance,
202  ValueHandler instanceHandler, ValueHandler keyHandler) {
204  try {
205  ResultData rs = selectUnique(domainTypeHandler, keyHandler, null);
206  if (rs.next()) {
207  // we have a result; initialize the instance
208  if (instanceHandler == null) {
209  if (logger.isDetailEnabled()) logger.detail("Creating instanceHandler for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
210  // we need both a new instance and its handler
211  instance = domainTypeHandler.newInstance();
212  instanceHandler = domainTypeHandler.getValueHandler(instance);
213  } else if (instance == null) {
214  if (logger.isDetailEnabled()) logger.detail("Creating instance for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
215  // we have a handler but no instance
216  instance = domainTypeHandler.getInstance(instanceHandler);
217  }
218  // found the instance in the datastore
219  instanceHandler.found(Boolean.TRUE);
220  // put the results into the instance
221  domainTypeHandler.objectSetValues(rs, instanceHandler);
222  // set the cache manager to track updates
223  domainTypeHandler.objectSetCacheManager(this, instanceHandler);
224  // reset modified bits in instance
225  domainTypeHandler.objectResetModified(instanceHandler);
226  } else {
227  if (logger.isDetailEnabled()) logger.detail("No instance found in database for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
228  // no instance found in database
229  if (instanceHandler != null) {
230  // mark the handler as not found
231  instanceHandler.found(Boolean.FALSE);
232  }
234  return null;
235  }
236  } catch (ClusterJException ex) {
238  throw ex;
239  }
241  return instance;
242  }
243 
249  private void setPartitionKey(DomainTypeHandler<?> domainTypeHandler,
250  ValueHandler keyHandler) {
251  if (!isEnlisted()) {
252  // there is still time to set the partition key
253  PartitionKey partitionKey =
254  domainTypeHandler.createPartitionKey(keyHandler);
255  clusterTransaction.setPartitionKey(partitionKey);
256  }
257  }
258 
264  public <T> T newInstance(Class<T> cls) {
265  return factory.newInstance(cls, dictionary);
266  }
267 
274  public <T> T newInstance(Class<T> cls, Object key) {
275  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
276  T instance = factory.newInstance(cls, dictionary);
277  domainTypeHandler.objectSetKeys(key, instance);
278  return instance;
279  }
280 
290  public <T> T load(final T object) {
291  if (object == null) {
292  return null;
293  }
294  if (Iterable.class.isAssignableFrom(object.getClass())) {
295  Iterable<?> instances = (Iterable<?>)object;
296  for (Object instance:instances) {
297  load(instance);
298  }
299  return object;
300  }
301  if (object.getClass().isArray()) {
302  Object[] instances = (Object[])object;
303  for (Object instance:instances) {
304  load(instance);
305  }
306  return object;
307  }
308  // a transaction must already be active (autocommit is not supported)
309  assertActive();
310  final DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object);
311  final ValueHandler instanceHandler = domainTypeHandler.getValueHandler(object);
312  setPartitionKey(domainTypeHandler, instanceHandler);
313  Table storeTable = domainTypeHandler.getStoreTable();
314  // perform a primary key operation
315  final Operation op = clusterTransaction.getSelectOperation(storeTable);
316  // set the keys into the operation
317  domainTypeHandler.operationSetKeys(instanceHandler, op);
318  // set the expected columns into the operation
319  domainTypeHandler.operationGetValues(op);
320  final ResultData rs = op.resultData(false);
321  final SessionImpl cacheManager = this;
322  // defer execution of the key operation until the next find, flush, or query
323  Runnable postExecuteOperation = new Runnable() {
324  public void run() {
325  if (rs.next()) {
326  // found row in database
327  instanceHandler.found(Boolean.TRUE);
328  // put the results into the instance
329  domainTypeHandler.objectSetValues(rs, instanceHandler);
330  // set the cache manager to track updates
331  domainTypeHandler.objectSetCacheManager(cacheManager, instanceHandler);
332  // reset modified bits in instance
333  domainTypeHandler.objectResetModified(instanceHandler);
334  } else {
335  // mark instance as not found
336  instanceHandler.found(Boolean.FALSE);
337  }
338 
339  }
340  };
341  postExecuteOperations.add(postExecuteOperation);
342  return object;
343  }
344 
353  public Boolean found(Object instance) {
354  if (instance == null) {
355  return null;
356  }
357  if (instance instanceof DynamicObject) {
358  return ((DynamicObject)instance).found();
359  }
360  // make sure the instance is a persistent type
361  getDomainTypeHandler(instance);
362  return true;
363  }
364 
370  public <T> T makePersistent(T object) {
371  if (object == null) {
372  return null;
373  }
374  if (Iterable.class.isAssignableFrom(object.getClass())) {
376  Iterable<?> instances = (Iterable<?>)object;
377  for (Object instance:instances) {
378  makePersistent(instance);
379  }
381  return object;
382  }
383  if (object.getClass().isArray()) {
385  Object[] instances = (Object[])object;
386  for (Object instance:instances) {
387  makePersistent(instance);
388  }
390  return object;
391  }
392  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(object);
393  ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
394  insert(domainTypeHandler, valueHandler);
395  return object;
396  }
397 
398  public Operation insert(
399  DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) {
401  setPartitionKey(domainTypeHandler, valueHandler);
402  Operation op = null;
403  Table storeTable = null;
404  try {
405  storeTable = domainTypeHandler.getStoreTable();
406  op = clusterTransaction.getInsertOperation(storeTable);
407  // set all values in the operation, keys first
408  domainTypeHandler.operationSetKeys(valueHandler, op);
409  domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
410  // reset modified bits in instance
411  domainTypeHandler.objectResetModified(valueHandler);
412  } catch (ClusterJUserException cjuex) {
414  throw cjuex;
415  } catch (ClusterJException cjex) {
417  logger.error(local.message("ERR_Insert", storeTable.getName()));
418  throw new ClusterJException(
419  local.message("ERR_Insert", storeTable.getName()), cjex);
420  } catch (RuntimeException rtex) {
422  logger.error(local.message("ERR_Insert", storeTable.getName()));
423  throw new ClusterJException(
424  local.message("ERR_Insert", storeTable.getName()), rtex);
425  }
427  return op;
428  }
429 
435  public Iterable makePersistentAll(Iterable instances) {
437  List<Object> result = new ArrayList<Object>();
438  for (Object instance:instances) {
439  result.add(makePersistent(instance));
440  }
442  return result;
443  }
444 
452  public <T> void deletePersistent(Class<T> cls, Object key) {
453  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
454  ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(key);
455  delete(domainTypeHandler, keyValueHandler);
456  }
457 
463  public void deletePersistent(Object object) {
464  if (object == null) {
465  return;
466  }
467  DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
468  ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
469  delete(domainTypeHandler, valueHandler);
470  }
471 
472  public Operation delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
474  Table storeTable = domainTypeHandler.getStoreTable();
475  setPartitionKey(domainTypeHandler, valueHandler);
476  Operation op = null;
477  try {
478  op = clusterTransaction.getDeleteOperation(storeTable);
479  domainTypeHandler.operationSetKeys(valueHandler, op);
480  } catch (ClusterJException ex) {
482  throw new ClusterJException(
483  local.message("ERR_Delete", storeTable.getName()), ex);
484  }
486  return op;
487  }
488 
492  public void deletePersistentAll(Iterable objects) {
494  for (Iterator it = objects.iterator(); it.hasNext();) {
495  deletePersistent(it.next());
496  }
498  }
499 
503  public <T> int deletePersistentAll(Class<T> cls) {
504  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
505  return deletePersistentAll(domainTypeHandler);
506  }
507 
512  public int deletePersistentAll(DomainTypeHandler<?> domainTypeHandler) {
514  Table storeTable = domainTypeHandler.getStoreTable();
515  String tableName = storeTable.getName();
516  ScanOperation op = null;
517  int count = 0;
518  try {
519  op = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
520  count = deletePersistentAll(op, true);
521  } catch (ClusterJException ex) {
523  // TODO add table name to the error message
524  throw new ClusterJException(
525  local.message("ERR_Delete_All", tableName), ex);
526  }
528  return count;
529  }
530 
536  public int deletePersistentAll(ScanOperation op, boolean abort) {
537  int cacheCount = 0;
538  int count = 0;
539  boolean done = false;
540  boolean fetch = true;
541  // cannot use early autocommit optimization here
542  clusterTransaction.setAutocommit(false);
543  // execute the operation
544  clusterTransaction.executeNoCommit(true, true);
545  while (!done ) {
546  int result = op.nextResult(fetch);
547  switch (result) {
548  case RESULT_READY:
549  op.deleteCurrentTuple();
550  ++count;
551  ++cacheCount;
552  fetch = false;
553  break;
554  case SCAN_FINISHED:
555  done = true;
556  if (cacheCount != 0) {
557  clusterTransaction.executeNoCommit(abort, true);
558  }
559  op.close();
560  break;
561  case CACHE_EMPTY:
562  clusterTransaction.executeNoCommit(abort, true);
563  cacheCount = 0;
564  fetch = true;
565  break;
566  default:
567  throw new ClusterJException(
568  local.message("ERR_Next_Result_Illegal", result));
569  }
570  }
571  return count;
572  }
573 
583  public ResultData selectUnique(DomainTypeHandler domainTypeHandler,
584  ValueHandler keyHandler, BitSet fields) {
585  assertActive();
586  setPartitionKey(domainTypeHandler, keyHandler);
587  Table storeTable = domainTypeHandler.getStoreTable();
588  // perform a single select by key operation
589  Operation op = clusterTransaction.getSelectOperation(storeTable);
590  // set the keys into the operation
591  domainTypeHandler.operationSetKeys(keyHandler, op);
592  // set the expected columns into the operation
593  domainTypeHandler.operationGetValues(op);
594  // execute the select and get results
595  ResultData rs = op.resultData();
596  return rs;
597  }
598 
605  public void updatePersistent(Object object) {
606  if (object == null) {
607  return;
608  }
609  DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
610  if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + object);
611  ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
612  update(domainTypeHandler, valueHandler);
613  }
614 
615  public Operation update(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
617  setPartitionKey(domainTypeHandler, valueHandler);
618  Table storeTable = null;
619  Operation op = null;
620  try {
621  storeTable = domainTypeHandler.getStoreTable();
622  op = clusterTransaction.getUpdateOperation(storeTable);
623  domainTypeHandler.operationSetKeys(valueHandler, op);
624  domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
625  if (logger.isDetailEnabled()) logger.detail("Updated object " +
626  valueHandler);
627  } catch (ClusterJException ex) {
629  throw new ClusterJException(
630  local.message("ERR_Update", storeTable.getName()) ,ex);
631  }
633  return op;
634  }
635 
639  public void updatePersistentAll(Iterable objects) {
641  for (Iterator it = objects.iterator(); it.hasNext();) {
642  updatePersistent(it.next());
643  }
645  }
646 
650  public <T> T savePersistent(T instance) {
651  DomainTypeHandler domainTypeHandler = getDomainTypeHandler(instance);
652  if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + instance);
653  ValueHandler valueHandler = domainTypeHandler.getValueHandler(instance);
655  setPartitionKey(domainTypeHandler, valueHandler);
656  Table storeTable = null;
657  try {
658  storeTable = domainTypeHandler.getStoreTable();
659  Operation op = null;
660  op = clusterTransaction.getWriteOperation(storeTable);
661  domainTypeHandler.operationSetKeys(valueHandler, op);
662  domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
663  if (logger.isDetailEnabled()) logger.detail("Wrote object " +
664  valueHandler);
665  } catch (ClusterJException ex) {
667  throw new ClusterJException(
668  local.message("ERR_Write", storeTable.getName()) ,ex);
669  }
671  return instance;
672  }
673 
677  public Iterable savePersistentAll(Iterable instances) {
678  List<Object> result = new ArrayList<Object>();
680  for (Iterator it = instances.iterator(); it.hasNext();) {
681  result.add(savePersistent(it.next()));
682  }
684  return result;
685  }
686 
692  return transactionImpl;
693  }
694 
698  public void close() {
699  if (clusterTransaction != null) {
700  clusterTransaction.close();
701  clusterTransaction = null;
702  }
703  if (db != null) {
704  db.close();
705  db = null;
706  }
707  }
708 
709  public boolean isClosed() {
710  return db==null;
711  }
712 
714  protected void assertNotClosed() {
715  if (isClosed()) {
716  throw new ClusterJUserException(
717  local.message("ERR_Session_Closed"));
718  }
719  }
720 
724  public void begin() {
725  if (logger.isDebugEnabled()) logger.debug("begin transaction.");
727  handleTransactionException();
728  }
729 
733  protected void internalBegin() {
734  try {
735  clusterTransaction = db.startTransaction(joinTransactionId);
736  clusterTransaction.setLockMode(lockmode);
737  // if a transaction has already begun, tell the cluster transaction about the key
738  if (partitionKey != null) {
739  clusterTransaction.setPartitionKey(partitionKey);
740  }
741  // register our post-execute callback
742  clusterTransaction.postExecuteCallback(postExecuteCallbackHandler);
743  } catch (ClusterJException ex) {
744  throw new ClusterJException(
745  local.message("ERR_Ndb_Start"), ex);
746  }
747  }
748 
752  public void commit() {
753  if (logger.isDebugEnabled()) logger.debug("commit transaction.");
755  handleTransactionException();
756  }
757 
761  protected void internalCommit() {
762  if (rollbackOnly) {
763  try {
765  throw new ClusterJException(
766  local.message("ERR_Transaction_Rollback_Only"));
767  } catch (ClusterJException ex) {
768  throw new ClusterJException(
769  local.message("ERR_Transaction_Rollback_Only"), ex);
770  }
771  }
772  try {
773  clusterTransaction.executeCommit(true, true);
774  } finally {
775  // always close the transaction
776  clusterTransaction.close();
777  clusterTransaction = null;
778  partitionKey = null;
779  }
780  }
781 
785  public void rollback() {
786  if (logger.isDebugEnabled()) logger.debug("roll back transaction.");
787  transactionState = transactionState.rollback();
788  handleTransactionException();
789  }
790 
796  protected void internalRollback() {
797  try {
798  clusterTransaction.executeRollback();
799  } catch (ClusterJException ex) {
800  throw new ClusterJException(
801  local.message("ERR_Transaction_Execute", "rollback"), ex);
802  } finally {
803  if (clusterTransaction != null) {
804  clusterTransaction.close();
805  }
806  clusterTransaction = null;
807  partitionKey = null;
808  }
809  }
810 
814  public void startAutoTransaction() {
815  if (logger.isDebugEnabled()) logger.debug("start AutoTransaction");
817  handleTransactionException();
818  }
819 
823  public void endAutoTransaction() {
824  if (logger.isDebugEnabled()) logger.debug("end AutoTransaction");
826  handleTransactionException();
827  }
828 
832  public void failAutoTransaction() {
833  if (logger.isDebugEnabled()) logger.debug("fail AutoTransaction");
835  }
836 
837  protected void handleTransactionException() {
838  if (transactionException == null) {
839  return;
840  } else {
841  ClusterJException ex = transactionException;
842  transactionException = null;
843  throw ex;
844  }
845  }
846 
850  public void setRollbackOnly() {
851  rollbackOnly = true;
852  }
853 
857  public boolean getRollbackOnly() {
858  return rollbackOnly;
859  }
860 
864  protected interface TransactionState {
865  boolean isActive();
866 
867  TransactionState begin();
868  TransactionState commit();
869  TransactionState rollback();
870 
871  TransactionState start();
872  TransactionState end();
873  TransactionState fail();
874  }
875 
878  new TransactionState() {
879 
880  public boolean isActive() {
881  return false;
882  }
883 
884  public TransactionState begin() {
885  try {
886  internalBegin();
887  return transactionStateActive;
888  } catch (ClusterJException ex) {
889  transactionException = ex;
891  }
892  }
893 
894  public TransactionState commit() {
895  transactionException = new ClusterJUserException(
896  local.message("ERR_Transaction_Must_Be_Active_For_Method",
897  "commit"));
899  }
900 
901  public TransactionState rollback() {
902  transactionException = new ClusterJUserException(
903  local.message("ERR_Transaction_Must_Be_Active_For_Method",
904  "rollback"));
906  }
907 
908  public TransactionState start() {
909  try {
910  internalBegin();
911  clusterTransaction.setAutocommit(true);
913  return transactionStateAutocommit;
914  } catch (ClusterJException ex) {
915  transactionException = ex;
917  }
918  }
919 
920  public TransactionState end() {
922  local.message("ERR_Transaction_Auto_Start", "end"));
923  }
924 
925  public TransactionState fail() {
927  local.message("ERR_Transaction_Auto_Start", "end"));
928  }
929 
930  };
931 
934  new TransactionState() {
935 
936  public boolean isActive() {
937  return true;
938  }
939 
940  public TransactionState begin() {
941  transactionException = new ClusterJUserException(
942  local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
943  "begin"));
944  return transactionStateActive;
945  }
946 
947  public TransactionState commit() {
948  try {
949  // flush unwritten changes
950  flush();
951  internalCommit();
952  } catch (ClusterJException ex) {
953  transactionException = ex;
954  }
956  }
957 
958  public TransactionState rollback() {
959  try {
962  } catch (ClusterJException ex) {
963  transactionException = ex;
965  }
966  }
967 
968  public TransactionState start() {
969  // nothing to do
970  return transactionStateActive;
971  }
972 
973  public TransactionState end() {
974  // nothing to do
975  return transactionStateActive;
976  }
977 
978  public TransactionState fail() {
979  // nothing to do
980  return transactionStateActive;
981  }
982 
983  };
984 
985  protected TransactionState transactionStateAutocommit =
986  new TransactionState() {
987 
988  public boolean isActive() {
989  return true;
990  }
991 
992  public TransactionState begin() {
994  local.message("ERR_Transaction_Auto_End", "begin"));
995  }
996 
997  public TransactionState commit() {
998  throw new ClusterJFatalInternalException(
999  local.message("ERR_Transaction_Auto_End", "commit"));
1000  }
1001 
1002  public TransactionState rollback() {
1003  throw new ClusterJFatalInternalException(
1004  local.message("ERR_Transaction_Auto_End", "rollback"));
1005  }
1006 
1007  public TransactionState start() {
1008  // nested start; increment counter
1010  return transactionStateAutocommit;
1011  }
1012 
1013  public TransactionState end() {
1014  if (--nestedAutoTransactionCounter > 0) {
1015  return transactionStateAutocommit;
1016  } else if (nestedAutoTransactionCounter == 0) {
1017  try {
1018  internalCommit();
1019  } catch (ClusterJException ex) {
1020  transactionException = ex;
1021  }
1023  } else {
1024  throw new ClusterJFatalInternalException(
1025  local.message("ERR_Transaction_Auto_Start", "end"));
1026  }
1027  }
1028 
1029  public TransactionState fail() {
1030  try {
1032  internalRollback();
1034  } catch (ClusterJException ex) {
1035  // ignore failures caused by internal rollback
1037  }
1038  }
1039 
1040  };
1041 
1047  protected synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(T object) {
1048  DomainTypeHandler<T> domainTypeHandler =
1050  return domainTypeHandler;
1051  }
1052 
1058  public synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
1059  DomainTypeHandler<T> domainTypeHandler =
1061  return domainTypeHandler;
1062  }
1063 
1064  public Dictionary getDictionary() {
1065  return dictionary;
1066  }
1067 
1072  boolean isActive() {
1073  return transactionState.isActive();
1074  }
1075 
1081  public boolean isEnlisted() {
1082  return clusterTransaction==null?false:clusterTransaction.isEnlisted();
1083  }
1084 
1089  private void assertActive() {
1090  if (!transactionState.isActive()) {
1091  throw new ClusterJUserException(
1092  local.message("ERR_Transaction_Must_Be_Active"));
1093  }
1094  }
1095 
1100  private void assertNotActive(String methodName) {
1101  if (transactionState.isActive()) {
1102  throw new ClusterJUserException(
1103  local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
1104  methodName));
1105  }
1106  }
1107 
1113  public Query createQuery(Class cls) {
1114  throw new UnsupportedOperationException(
1115  local.message("ERR_NotImplemented"));
1116  }
1117 
1123  return new QueryBuilderImpl(this);
1124  }
1125 
1132  public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
1133  assertActive();
1134  try {
1135  IndexScanOperation result = clusterTransaction.getIndexScanOperation(storeIndex, storeTable);
1136  return result;
1137  } catch (ClusterJException ex) {
1138  throw new ClusterJException(
1139  local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1140  }
1141  }
1142 
1150  assertActive();
1151  try {
1152  IndexScanOperation result = clusterTransaction.getIndexScanOperationMultiRange(storeIndex, storeTable);
1153  return result;
1154  } catch (ClusterJException ex) {
1155  throw new ClusterJException(
1156  local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1157  }
1158  }
1159 
1167  assertActive();
1168  try {
1169  IndexScanOperation result = clusterTransaction.getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(storeIndex, storeTable);
1170  return result;
1171  } catch (ClusterJException ex) {
1172  throw new ClusterJException(
1173  local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1174  }
1175  }
1176 
1183  assertActive();
1184  try {
1185  ScanOperation result = clusterTransaction.getTableScanOperation(storeTable);
1186  return result;
1187  } catch (ClusterJException ex) {
1188  throw new ClusterJException(
1189  local.message("ERR_Table_Scan", storeTable.getName()), ex);
1190  }
1191  }
1192 
1199  assertActive();
1200  try {
1201  ScanOperation result = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
1202  return result;
1203  } catch (ClusterJException ex) {
1204  throw new ClusterJException(
1205  local.message("ERR_Table_Scan", storeTable.getName()), ex);
1206  }
1207  }
1208 
1215  public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
1216  assertActive();
1217  try {
1218  IndexOperation result = clusterTransaction.getUniqueIndexOperation(storeIndex, storeTable);
1219  return result;
1220  } catch (ClusterJException ex) {
1221  throw new ClusterJException(
1222  local.message("ERR_Unique_Index", storeTable.getName(), storeIndex.getName()), ex);
1223  }
1224  }
1225 
1231  public Operation getSelectOperation(Table storeTable) {
1232  assertActive();
1233  try {
1234  Operation result = clusterTransaction.getSelectOperation(storeTable);
1235  return result;
1236  } catch (ClusterJException ex) {
1237  throw new ClusterJException(
1238  local.message("ERR_Select", storeTable), ex);
1239  }
1240  }
1241 
1247  public Operation getDeleteOperation(Table storeTable) {
1248  assertActive();
1249  try {
1250  Operation result = clusterTransaction.getDeleteOperation(storeTable);
1251  return result;
1252  } catch (ClusterJException ex) {
1253  throw new ClusterJException(
1254  local.message("ERR_Delete", storeTable), ex);
1255  }
1256  }
1257 
1264  public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
1265  assertActive();
1266  try {
1267  IndexOperation result = clusterTransaction.getUniqueIndexDeleteOperation(storeIndex, storeTable);
1268  return result;
1269  } catch (ClusterJException ex) {
1270  throw new ClusterJException(
1271  local.message("ERR_Unique_Index_Delete", storeTable.getName(), storeIndex.getName()), ex);
1272  }
1273  }
1274 
1275  public void flush() {
1276  if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
1277  if (!changeList.isEmpty()) {
1278  for (StateManager sm: changeList) {
1279  sm.flush(this);
1280  }
1281  changeList.clear();
1282  }
1283  // now flush changes to the back end
1284  if (clusterTransaction != null) {
1285  executeNoCommit();
1286  }
1287  }
1288 
1289  public List getChangeList() {
1290  return Collections.unmodifiableList(changeList);
1291  }
1292 
1293  public void persist(Object instance) {
1294  makePersistent(instance);
1295  }
1296 
1297  public void remove(Object instance) {
1298  deletePersistent(instance);
1299  }
1300 
1301  public void markModified(StateManager instance) {
1302  changeList.add(instance);
1303  }
1304 
1305  public void setPartitionKey(Class<?> domainClass, Object key) {
1306  DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(domainClass);
1307  String tableName = domainTypeHandler.getTableName();
1308  // if transaction is enlisted, throw a user exception
1309  if (isEnlisted()) {
1310  throw new ClusterJUserException(
1311  local.message("ERR_Set_Partition_Key_After_Enlistment", tableName));
1312  }
1313  // if a partition key has already been set, throw a user exception
1314  if (this.partitionKey != null) {
1315  throw new ClusterJUserException(
1316  local.message("ERR_Set_Partition_Key_Twice", tableName));
1317  }
1318  ValueHandler handler = domainTypeHandler.createKeyValueHandler(key);
1319  this.partitionKey= domainTypeHandler.createPartitionKey(handler);
1320  // if a transaction has already begun, tell the cluster transaction about the key
1321  if (clusterTransaction != null) {
1322  clusterTransaction.setPartitionKey(partitionKey);
1323  }
1324  }
1325 
1331  public void markModified(Object instance, String fieldName) {
1332  DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(instance);
1333  ValueHandler handler = domainTypeHandler.getValueHandler(instance);
1334  domainTypeHandler.objectMarkModified(handler, fieldName);
1335  }
1336 
1343  public void executeNoCommit(boolean abort, boolean force) {
1344  if (clusterTransaction != null) {
1345  clusterTransaction.executeNoCommit(abort, force);
1346  }
1347  }
1348 
1354  public void executeNoCommit() {
1355  executeNoCommit(true, true);
1356  }
1357 
1358  public <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> domainTypeHandler) {
1360  return builder.createQueryDefinition(domainTypeHandler);
1361  }
1362 
1368  return clusterTransaction.getCoordinatedTransactionId();
1369  }
1370 
1375  public void setCoordinatedTransactionId(String coordinatedTransactionId) {
1376  clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
1377  }
1378 
1383  public void setLockMode(LockMode lockmode) {
1384  this.lockmode = lockmode;
1385  if (clusterTransaction != null) {
1386  clusterTransaction.setLockMode(lockmode);
1387  }
1388  }
1389 
1390 }