MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbOpenJPAStoreManager.java
1 /*
2  Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 package com.mysql.clusterj.openjpa;
19 
20 import java.sql.SQLException;
21 import java.util.ArrayList;
22 import java.util.BitSet;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26 
27 import org.apache.openjpa.jdbc.kernel.ConnectionInfo;
28 import org.apache.openjpa.jdbc.kernel.JDBCFetchConfiguration;
29 import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
30 import org.apache.openjpa.jdbc.meta.ClassMapping;
31 import org.apache.openjpa.jdbc.meta.ValueMapping;
32 import org.apache.openjpa.jdbc.schema.Table;
33 import org.apache.openjpa.jdbc.sql.Result;
34 import org.apache.openjpa.kernel.FetchConfiguration;
35 import org.apache.openjpa.kernel.OpenJPAStateManager;
36 import org.apache.openjpa.kernel.PCState;
37 import org.apache.openjpa.kernel.QueryLanguages;
38 import org.apache.openjpa.kernel.StoreContext;
39 import org.apache.openjpa.kernel.StoreQuery;
40 import org.apache.openjpa.kernel.exps.ExpressionParser;
41 import org.apache.openjpa.meta.ClassMetaData;
42 import org.apache.openjpa.meta.FieldMetaData;
43 import org.apache.openjpa.util.OpenJPAId;
44 
45 import com.mysql.clusterj.ClusterJDatastoreException;
46 import com.mysql.clusterj.ClusterJException;
47 import com.mysql.clusterj.ClusterJFatalInternalException;
48 import com.mysql.clusterj.ClusterJFatalUserException;
49 import com.mysql.clusterj.ClusterJUserException;
50 import com.mysql.clusterj.SessionFactory;
51 import com.mysql.clusterj.Transaction;
52 import com.mysql.clusterj.core.query.QueryExecutionContextImpl;
53 import com.mysql.clusterj.core.spi.DomainTypeHandler;
54 import com.mysql.clusterj.core.spi.SessionSPI;
55 import com.mysql.clusterj.core.spi.ValueHandler;
56 import com.mysql.clusterj.core.store.Dictionary;
57 import com.mysql.clusterj.core.store.Operation;
58 import com.mysql.clusterj.core.store.ResultData;
59 import com.mysql.clusterj.core.util.I18NHelper;
60 import com.mysql.clusterj.core.util.Logger;
61 import com.mysql.clusterj.core.util.LoggerFactoryService;
62 import com.mysql.clusterj.query.QueryDomainType;
63 
67 public class NdbOpenJPAStoreManager extends JDBCStoreManager {
68 
70  static final I18NHelper local = I18NHelper.getInstance(NdbOpenJPAStoreManager.class);
71 
74 
75  @SuppressWarnings("unused")
76  private StoreContext storeContext;
77  private NdbOpenJPAConfiguration ndbConfiguration;
78  private SessionFactory sessionFactory;
79 
80  // TODO This really belongs in store context.
81  private SessionSPI session;
82  private Transaction tx;
83  private Dictionary dictionary;
84 
85  public NdbOpenJPAStoreManager() {
86  super();
87  }
88 
89  @Override
90  public void setContext(StoreContext ctx) {
91  super.setContext(ctx);
92  setContext(ctx, (NdbOpenJPAConfiguration) ctx.getConfiguration());
93  }
94 
95  public void setContext(StoreContext ctx, NdbOpenJPAConfiguration conf) {
96  storeContext = ctx;
97  ndbConfiguration = conf;
98  sessionFactory = conf.getSessionFactory();
99  getSession();
100  }
101 
102  protected NdbOpenJPADomainTypeHandlerImpl<?> getDomainTypeHandler(OpenJPAStateManager sm) {
103  // get DomainTypeHandler from StateManager
104  ClassMapping cmp = (ClassMapping) sm.getMetaData();
105  return getDomainTypeHandler(cmp);
106  }
107 
108  protected NdbOpenJPADomainTypeHandlerImpl<?> getDomainTypeHandler(ClassMapping cmp) {
109  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler =
110  ndbConfiguration.getDomainTypeHandler(cmp, dictionary);
111  return domainTypeHandler;
112  }
113 
114  protected int deleteAll(DomainTypeHandler<?> base) {
115  // used by NdbOpenJPAStoreQuery to delete all instances of a class
116  int result = session.deletePersistentAll(base);
117  return result;
118  }
119 
120  protected SessionSPI getSession() {
121  if (session == null) {
122  session = (SessionSPI) sessionFactory.getSession();
123  dictionary = session.getDictionary();
124  }
125  return session;
126  }
130  @Override
131  public Object find(Object oid, ValueMapping vm,
132  JDBCFetchConfiguration fetch) {
133  if (logger.isDebugEnabled()) {
134  logger.debug("NdbStoreManager.find(Object oid, ValueMapping vm, "
135  + "JDBCFetchConfiguration fetch) delegated to super with oid " + oid + ".");
136  }
137  // return null if the oid is null (this will be the case if a foreign key element is null)
138  ClassMapping cls = vm.getDeclaredTypeMapping();
139  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(cls);
140  Object handler = domainTypeHandler.createKeyValueHandler(oid);
141  if (handler == null) {
142  return null;
143  }
144  return super.find(oid, vm, fetch);
145  }
146 
155  @Override
156  public boolean load(OpenJPAStateManager sm, BitSet fields,
157  FetchConfiguration fetch, int lockLevel, Object context) {
158  if (logger.isDebugEnabled()) {
159  logger.debug("NdbStoreManager.load(OpenJPAStateManager sm, BitSet fields, "
160  + "FetchConfiguration fetch, int lockLevel, Object context) "
161  + "Id: " + sm.getId() + " requested fields: "
162  + NdbOpenJPAUtility.printBitSet(sm, fields));
163  }
164  if (context != null && ((ConnectionInfo) context).result != null) {
165  // there is already a result set to process
166  return super.load(sm, fields, fetch, lockLevel, context);
167  } else {
168  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
169  if (!isSupportedType(domainTypeHandler, "NdbOpenJPAStoreManager.load")) {
170  return super.load(sm, fields, fetch, lockLevel, context);
171  } else {
172  try {
173  return domainTypeHandler.load(sm, this, fields, (JDBCFetchConfiguration) fetch, context);
174  } catch (SQLException sQLException) {
175  logger.error("Fatal error from NdbOpenJPAStoreManager.load " + sQLException);
176  return false;
177  }
178  }
179  }
180  }
181 
182  @Override
183  public Object load(ClassMapping mapping, JDBCFetchConfiguration fetch,
184  BitSet exclude, Result result) throws SQLException {
185  if (logger.isDebugEnabled()) {
186  logger.debug("NdbStoreManager.load(ClassMapping mapping, JDBCFetchConfiguration fetch, "
187  + "BitSet exclude, Result result) for " + mapping.getDescribedType().getName()
188  + " delegated to super.");
189  }
190  return super.load(mapping, fetch, exclude, result);
191  }
192 
193  @SuppressWarnings("unchecked")
194  @Override
195  public Collection loadAll(Collection sms, PCState state, int load,
196  FetchConfiguration fetch, Object context) {
197  if (logger.isDebugEnabled()) {
198  logger.debug("NdbStoreManager.loadAll(Collection sms, PCState state, int load, "
199  + "FetchConfiguration fetch, Object context) delegated to super.");
200  }
201  return super.loadAll(sms, state, load, fetch, context);
202  }
203 
204  @Override
205  public boolean initialize(OpenJPAStateManager sm, PCState state,
206  FetchConfiguration fetch, Object context) {
207  if (logger.isDebugEnabled()) {
208  logger.debug("NdbStoreManager.initialize(OpenJPAStateManager sm, PCState state, "
209  + "FetchConfiguration fetch, Object context)");
210  }
211  // if context already contains a result, use the result to initialize
212  if (context != null) {
213  ConnectionInfo info = (ConnectionInfo)context;
214  ClassMapping mapping = info.mapping;
215  Result result = info.result;
216  logger.info("info mapping: " + mapping.getDescribedType().getName() + " result: " + result);
217  try {
218  return initializeState(sm, state, (JDBCFetchConfiguration)fetch, info);
219  } catch (ClassNotFoundException e) {
220  throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"), e);
221  } catch (SQLException e) {
222  throw new ClusterJDatastoreException(local.message("ERR_Datastore_Exception"), e);
223  }
224  }
225  // otherwise, load from the datastore
226  // TODO: support user-defined oid types
227  OpenJPAId id = (OpenJPAId)sm.getId();
228  if (logger.isTraceEnabled()) {
229  logger.trace("Id: " + id.getClass() + " " + id);
230  }
231  // get domain type handler for StateManager
232  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
233 
234  if (!isSupportedType(domainTypeHandler, "NdbOpenJPAStoreManager.initialize")) {
235  // if not supported, go the jdbc route
236  boolean result = super.initialize(sm, state, fetch, context);
237  if (logger.isDebugEnabled()) logger.debug(
238  "NdbOpenJPAStoreManager.initialize delegated to super: returned " + result);
239  return result;
240  }
241  try {
242  // get session from session factory
243  getSession();
244  session.startAutoTransaction();
245  // get domain type handler for StateManager
246 // NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler =
247 // getDomainTypeHandler(sm);
248 // Object instance = session.initializeFromDatabase(
249 // domainTypeHandler, null,
250 // domainTypeHandler.getValueHandler(sm),
251 // domainTypeHandler.createKeyValueHandler(id.getIdObject()));
252  // initialize via OpenJPA protocol
253  // select all columns from table
254  ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(id.getIdObject());
255  ResultData resultData = session.selectUnique(domainTypeHandler,
256  keyValueHandler,
257  null);
258  // create an OpenJPA Result from the ndb result data
259  NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, null);
260  if (result.next()) {
261  // we have an instance; create the PC instance
262  domainTypeHandler.newInstance(sm);
263  // for each field, call its handler to initialize the field
264  // TODO: should compare using this technique against
265  // using clusterj directly (see above) since
266  // there is a lot more overhead using the openjpa technique
267  domainTypeHandler.load(sm, this, (JDBCFetchConfiguration)fetch, result);
268 // NdbOpenJPADomainFieldHandlerImpl[] fieldHandlers = domainTypeHandler.getDomainFieldHandlers();
269 // for (NdbOpenJPADomainFieldHandlerImpl fmd:fieldHandlers) {
270 // if (true) {
271 // // if (fmd.isToOne()) {
272 // FieldMapping fm = fmd.getFieldMapping();
273 // fm.load(sm, this, (JDBCFetchConfiguration)fetch, result);
274 // }
275 // // fmd.load(sm, fetch, result);
276 // }
277  }
278  if (logger.isDetailEnabled()) {
279  logger.detail("After initializing PCState: " +
280  sm.getPCState().getClass().getSimpleName() + " " +
281  printLoaded(sm));
282  }
283  session.endAutoTransaction();
284  return true;
285 
286  } catch (ClusterJException e) {
287  session.failAutoTransaction();
288  throw e;
289  } catch (Exception e) {
290  session.failAutoTransaction();
291  throw new ClusterJFatalInternalException("Unexpected exception.", e);
292  // if any problem, fall back
293  // return super.initialize(sm, state, fetch, context);
294  }
295  }
296 
297  @Override
298  protected boolean initializeState(OpenJPAStateManager sm, PCState state,
299  JDBCFetchConfiguration fetch, ConnectionInfo info)
300  throws ClassNotFoundException, SQLException {
301  if (logger.isDebugEnabled()) {
302  logger.debug("NdbStoreManager.initializeState(" +
303  "OpenJPAStateManager, PCState, JDBCFetchConfiguration, " +
304  "ConnectionInfo) delegated to super.");
305  }
306  return super.initializeState(sm, state, fetch, info);
307  }
308 
325  @SuppressWarnings("unchecked")
326  @Override
327  public Collection<Exception> flush(Collection sms) {
328  Collection<OpenJPAStateManager> stateManagers =
329  (Collection<OpenJPAStateManager>)sms;
330  StringBuffer buffer = null;
331  if (logger.isTraceEnabled()) {
332  buffer = new StringBuffer();
333  }
334  // make sure all instances are OK to insert/update/delete
335  boolean allSupportedTypes = true;
336  for (OpenJPAStateManager sm: stateManagers) {
337  DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(sm);
338  if (!domainTypeHandler.isSupportedType()) {
339  if (logger.isDetailEnabled()) logger.detail("Found unsupported class "
340  + domainTypeHandler.getName());
341  if (ndbConfiguration.getFailOnJDBCPath()) {
342  throw new ClusterJFatalUserException(
343  local.message("ERR_JDBC_Path", domainTypeHandler.getName()));
344  }
345  allSupportedTypes = false;
346  }
347  if (logger.isTraceEnabled()) {
348  buffer.append(printState(sm));
349  }
350  }
351  if (logger.isTraceEnabled()) {
352  logger.trace(buffer.toString());
353  }
354  if (!allSupportedTypes) {
355  // not all instances are of supported types; delegate to super
356  Collection<Exception> exceptions = super.flush(sms);
357  if (logger.isDetailEnabled()) logger.detail("Found unsupported class(es); "
358  + "super resulted in exceptions: " + exceptions);
359  return exceptions;
360  }
361  // now flush changes to the cluster back end
362  getSession();
363  Collection exceptions = new ArrayList<Exception>();
364  for (OpenJPAStateManager sm:stateManagers) {
365  // get DomainTypeHandler from StateManager
366  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
367  // get the value handler for the StateManager
368  ValueHandler valueHandler = domainTypeHandler.getValueHandler(sm, this);
369  // now flush based on current PCState
370  PCState pcState = sm.getPCState();
371  try {
372  if (pcState == PCState.PNEW) {
373  // flush new instance
374  session.insert(domainTypeHandler, valueHandler);
375  } else if (pcState == PCState.PDELETED) {
376  // flush deleted instance
377  session.delete(domainTypeHandler, valueHandler);
378  } else if (pcState == PCState.PDIRTY) {
379  // flush dirty instance
380  session.update(domainTypeHandler, valueHandler);
381  } else if (pcState == PCState.PNEWFLUSHEDDELETED) {
382  // flush new flushed deleted instance
383  session.delete(domainTypeHandler, valueHandler);
384  } else if (pcState == PCState.PNEWFLUSHEDDELETEDFLUSHED) {
385  // nothing to do
386  } else {
387  throw new ClusterJUserException(
388  local.message("ERR_Unsupported_Flush_Operation",
389  pcState.toString()));
390  }
391  } catch (Exception ex) {
392  if (logger.isDebugEnabled()) {
393  logger.debug("Exception caught: " + ex.toString());
394  }
395  exceptions.add(ex);
396  }
397  }
398  // after all instances are flushed, send to the back end
399  session.flush();
400 
401  return exceptions;
402  }
403 
409  private boolean isSupportedType(NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler,
410  String where) {
411  boolean result = domainTypeHandler.isSupportedType();
412  if (!result) {
413  if (logger.isDebugEnabled()) logger.debug(where
414  + " found unsupported class " + domainTypeHandler.getName());
415  if (ndbConfiguration.getFailOnJDBCPath()) {
416  throw new ClusterJFatalUserException(
417  local.message("ERR_JDBC_Path", domainTypeHandler.getName()));
418  }
419  }
420  return result;
421  }
422 
423  @Override
424  public void beforeStateChange(OpenJPAStateManager sm, PCState fromState,
425  PCState toState) {
426  if (logger.isDetailEnabled()) {
427  logger.detail(
428  printState("from ", fromState) +
429  printState(" to ", toState));
430  }
431  super.beforeStateChange(sm, fromState, toState);
432  }
433 
434  @Override
435  public StoreQuery newQuery(String language) {
436  ExpressionParser ep = QueryLanguages.parserForLanguage(language);
437  return new NdbOpenJPAStoreQuery(this, ep);
438  }
439 
440  @Override
441  public void beginOptimistic() {
442  if (logger.isTraceEnabled()) {
443  logger.trace(" Transaction " + hashCode() + printIsActive(tx));
444  }
445  super.beginOptimistic();
446  try {
447  getSession();
448  tx = session.currentTransaction();
449  if (tx.isActive()) {
450  tx.commit();
451  }
452  tx.begin();
453  } catch (Exception e) {
454  logger.detail("NdbOpenJPAStoreManager.beginOptimistic():" +
455  "caught exception in session.currentTransaction.begin().");
456  throw new ClusterJDatastoreException(
457  local.message("ERR_Datastore_Exception"), e);
458  }
459  }
460 
461  @Override
462  public void begin() {
463  if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
464  getSession();
465  try {
466  // end ndb transaction if active
467  tx = session.currentTransaction();
468  if (tx.isActive()) {
469  tx.commit();
470  }
471  tx.begin();
472  } catch (Exception e) {
473  logger.detail("Caught exception in session.currentTransaction.commit()." +
474  e.getMessage());
475  }
476  // TODO: handle JDBC connection for queries
477  super.begin();
478  }
479 
480  @Override
481  public void commit() {
482  if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
483  try {
484  session.commit();
485  } catch (Exception ex) {
486  logger.detail(" failed" + ex.toString());
487  throw new ClusterJException(
488  local.message("ERR_Commit_Failed", ex.toString()));
489  }
490  // TODO: handle JDBC connection for queries
491  super.commit();
492  }
493 
494  @Override
495  public void rollback() {
496  if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
497  session.rollback();
498  // TODO: handle JDBC connection for queries
499  super.rollback();
500  }
501 
502  @Override
503  public void close() {
504  if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
505  if (session != null && !session.isClosed()) {
506  if (session.currentTransaction().isActive()) {
507  tx.commit();
508  }
509  session.close();
510  }
511  }
512 
513  protected String printState(OpenJPAStateManager sm) {
514  StringBuffer buffer = new StringBuffer();
515  buffer.append("class: ");
516  buffer.append(sm.getPersistenceCapable().getClass().getName());
517  buffer.append(" objectId: ");
518  buffer.append(sm.getObjectId());
519  buffer.append(" PCState: ");
520  buffer.append(sm.getPCState());
521  buffer.append("\n");
522  return buffer.toString();
523  }
524 
525  protected String printState(String header, PCState state) {
526  StringBuffer buffer = new StringBuffer(header);
527  buffer.append(state.getClass().getSimpleName());
528  return buffer.toString();
529  }
530 
531  protected String printLoaded(OpenJPAStateManager sm) {
532  BitSet loaded = sm.getLoaded();
533  return "Loaded: " + NdbOpenJPAUtility.printBitSet(sm, loaded);
534  }
535 
536  protected String printIsActive(Transaction tx) {
537  return (tx==null?" is null.":tx.isActive()?" is active.":" is not active.");
538  }
539 // The following is not used in ClusterJ, since managed mode is not implemented
540 // LockManager lm = ctx.getLockManager();
541 // if (lm instanceof JDBCLockManager)
542 // _lm = (JDBCLockManager) lm;
543 //
544 // if (!ctx.isManaged() && _conf.isConnectionFactoryModeManaged())
545 // _ds = _conf.getDataSource2(ctx);
546 // else
547 // _ds = _conf.getDataSource(ctx);
548 //
549 // if (_conf.getUpdateManagerInstance().orderDirty())
550 // ctx.setOrderDirtyObjects(true);
551 
556  public <T> QueryDomainType<T> createQueryDomainType(Class<T> type) {
557  return session.getQueryBuilder().createQueryDefinition(type);
558  }
559 
566  public NdbOpenJPAResult executeQuery(DomainTypeHandler<?> domainTypeHandler,
567  QueryDomainType<?> queryDomainType, Map<String, Object> parameterMap) {
568  QueryExecutionContextImpl context = new QueryExecutionContextImpl(session, parameterMap);
569  ResultData resultData = context.getResultData(queryDomainType);
570  NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, null);
571  return result;
572  }
573 
580  public NdbOpenJPAResult lookup(OpenJPAStateManager sm,
581  NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler,
582  List<NdbOpenJPADomainFieldHandlerImpl> fieldHandlers) {
583  com.mysql.clusterj.core.store.Table storeTable = domainTypeHandler.getStoreTable();
584  session.startAutoTransaction();
585  try {
586  Operation op = session.getSelectOperation(storeTable);
587  int[] keyFields = domainTypeHandler.getKeyFieldNumbers();
588  BitSet fieldsInResult = new BitSet();
589  for (int i : keyFields) {
590  fieldsInResult.set(i);
591  }
592  ValueHandler handler = domainTypeHandler.getValueHandler(sm, this);
593  domainTypeHandler.operationSetKeys(handler, op);
594  // include the key columns in the results
595  domainTypeHandler.operationGetKeys(op);
596  for (NdbOpenJPADomainFieldHandlerImpl fieldHandler : fieldHandlers) {
597  fieldHandler.operationGetValue(op);
598  fieldsInResult.set(fieldHandler.getFieldNumber());
599  }
600  ResultData resultData = op.resultData();
601  NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, fieldsInResult);
602  session.endAutoTransaction();
603  return result;
604  } catch (RuntimeException ex) {
605  session.failAutoTransaction();
606  throw ex;
607  }
608  }
609 
610  public Dictionary getDictionary() {
611  return dictionary;
612  }
613 
614 }