MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SessionFactoryImpl.java
1 /*
2  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 package com.mysql.clusterj.core;
19 
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalException;
22 import com.mysql.clusterj.ClusterJFatalInternalException;
23 import com.mysql.clusterj.ClusterJFatalUserException;
24 import com.mysql.clusterj.ClusterJHelper;
25 import com.mysql.clusterj.ClusterJUserException;
26 import com.mysql.clusterj.Constants;
27 import com.mysql.clusterj.Session;
28 import com.mysql.clusterj.SessionFactory;
29 
30 import com.mysql.clusterj.core.spi.DomainTypeHandler;
31 import com.mysql.clusterj.core.spi.DomainTypeHandlerFactory;
32 import com.mysql.clusterj.core.metadata.DomainTypeHandlerFactoryImpl;
33 
34 import com.mysql.clusterj.core.store.Db;
35 import com.mysql.clusterj.core.store.ClusterConnection;
36 import com.mysql.clusterj.core.store.ClusterConnectionService;
37 import com.mysql.clusterj.core.store.Dictionary;
38 import com.mysql.clusterj.core.store.Table;
39 
40 import com.mysql.clusterj.core.util.I18NHelper;
41 import com.mysql.clusterj.core.util.Logger;
42 import com.mysql.clusterj.core.util.LoggerFactoryService;
43 
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 
49 public class SessionFactoryImpl implements SessionFactory, Constants {
50 
52  static final I18NHelper local = I18NHelper.getInstance(SessionFactoryImpl.class);
53 
56 
58  protected Map<?, ?> props;
59 
61  String CLUSTER_CONNECTION_SERVICE;
62  String CLUSTER_CONNECT_STRING;
63  int CLUSTER_CONNECT_RETRIES;
64  int CLUSTER_CONNECT_DELAY;
65  int CLUSTER_CONNECT_VERBOSE;
66  int CLUSTER_CONNECT_TIMEOUT_BEFORE;
67  int CLUSTER_CONNECT_TIMEOUT_AFTER;
68  String CLUSTER_DATABASE;
69  int CLUSTER_MAX_TRANSACTIONS;
70 
72  List<Integer> nodeIds = new ArrayList<Integer>();
73 
75  int connectionPoolSize;
76 
78  // TODO make this non-static
79  static private Map<Class<?>, Class<?>> proxyClassToDomainClass = new HashMap<Class<?>, Class<?>>();
80 
82  // TODO make this non-static
83  static final protected Map<Class<?>, DomainTypeHandler<?>> typeToHandlerMap =
84  new HashMap<Class<?>, DomainTypeHandler<?>>();
85 
87  DomainTypeHandlerFactory domainTypeHandlerFactory = new DomainTypeHandlerFactoryImpl();
88 
90  // TODO make this non-static
91 // static final protected Map<String,Table> Tables = new HashMap<String,Table>();
92 
94  static final protected Map<String, SessionFactoryImpl> sessionFactoryMap =
95  new HashMap<String, SessionFactoryImpl>();
96 
98  final String key;
99 
101  private List<ClusterConnection> pooledConnections = new ArrayList<ClusterConnection>();
102 
108  CLUSTER_CONNECTION_SERVICE);
109  }
110 
118  static public SessionFactoryImpl getSessionFactory(Map<?, ?> props) {
119  int connectionPoolSize = getIntProperty(props,
121  String sessionFactoryKey = getSessionFactoryKey(props);
122  SessionFactoryImpl result = null;
123  if (connectionPoolSize != 0) {
124  // if using connection pooling, see if already a session factory created
125  synchronized(sessionFactoryMap) {
126  result = sessionFactoryMap.get(sessionFactoryKey);
127  if (result == null) {
128  result = new SessionFactoryImpl(props);
129  sessionFactoryMap.put(sessionFactoryKey, result);
130  }
131  }
132  } else {
133  // if not using connection pooling, create a new session factory
134  result = new SessionFactoryImpl(props);
135  }
136  return result;
137  }
138 
139  private static String getSessionFactoryKey(Map<?, ?> props) {
140  String clusterConnectString =
142  String clusterDatabase = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
144  return clusterConnectString + "+" + clusterDatabase;
145  }
146 
152  protected SessionFactoryImpl(Map<?, ?> props) {
153  this.props = props;
154  this.key = getSessionFactoryKey(props);
155  this.connectionPoolSize = getIntProperty(props,
157  CLUSTER_CONNECT_STRING = getRequiredStringProperty(props, PROPERTY_CLUSTER_CONNECTSTRING);
158  CLUSTER_CONNECT_RETRIES = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_RETRIES,
160  CLUSTER_CONNECT_DELAY = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_DELAY,
162  CLUSTER_CONNECT_VERBOSE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_VERBOSE,
164  CLUSTER_CONNECT_TIMEOUT_BEFORE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_BEFORE,
166  CLUSTER_CONNECT_TIMEOUT_AFTER = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_AFTER,
168  CLUSTER_DATABASE = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
170  CLUSTER_MAX_TRANSACTIONS = getIntProperty(props, PROPERTY_CLUSTER_MAX_TRANSACTIONS,
172  CLUSTER_CONNECTION_SERVICE = getStringProperty(props, PROPERTY_CLUSTER_CONNECTION_SERVICE);
173  createClusterConnectionPool();
174  // now get a Session and complete a transaction to make sure that the cluster is ready
175  try {
176  Session session = getSession(null);
177  session.currentTransaction().begin();
178  session.currentTransaction().commit();
179  session.close();
180  } catch (Exception e) {
181  if (e instanceof ClusterJException) {
182  logger.warn(local.message("ERR_Session_Factory_Impl_Failed_To_Complete_Transaction"));
183  throw (ClusterJException)e;
184  }
185  }
186  }
187 
188  protected void createClusterConnectionPool() {
190  if (nodeIdsProperty != null) {
191  // separators are any combination of white space, commas, and semicolons
192  String[] nodeIdsStringArray = nodeIdsProperty.split("[,; \t\n\r]+", 48);
193  for (String nodeIdString : nodeIdsStringArray) {
194  try {
195  int nodeId = Integer.parseInt(nodeIdString);
196  nodeIds.add(nodeId);
197  } catch (NumberFormatException ex) {
198  throw new ClusterJFatalUserException(local.message("ERR_Node_Ids_Format", nodeIdsProperty), ex);
199  }
200  }
201  // validate the size of the node ids with the connection pool size
202  if (connectionPoolSize != DEFAULT_PROPERTY_CONNECTION_POOL_SIZE) {
203  // both are specified; they must match or nodeIds size must be 1
204  if (nodeIds.size() ==1) {
205  // add new nodeIds to fill out array
206  for (int i = 1; i < connectionPoolSize; ++i) {
207  nodeIds.add(nodeIds.get(i - 1) + 1);
208  }
209  }
210  if (connectionPoolSize != nodeIds.size()) {
211  throw new ClusterJFatalUserException(
212  local.message("ERR_Node_Ids_Must_Match_Connection_Pool_Size",
213  nodeIdsProperty, connectionPoolSize));
214 
215  }
216  } else {
217  // only node ids are specified; make pool size match number of node ids
218  connectionPoolSize = nodeIds.size();
219  }
220  }
221  ClusterConnectionService service = getClusterConnectionService();
222  if (nodeIds.size() == 0) {
223  // node ids were not specified
224  for (int i = 0; i < connectionPoolSize; ++i) {
225  createClusterConnection(service, props, 0);
226  }
227  } else {
228  for (int i = 0; i < connectionPoolSize; ++i) {
229  createClusterConnection(service, props, nodeIds.get(i));
230  }
231  }
232  }
233 
234  protected ClusterConnection createClusterConnection(
235  ClusterConnectionService service, Map<?, ?> props, int nodeId) {
236  ClusterConnection result = null;
237  try {
238  result = service.create(CLUSTER_CONNECT_STRING, nodeId);
239  result.connect(CLUSTER_CONNECT_RETRIES, CLUSTER_CONNECT_DELAY,true);
240  result.waitUntilReady(CLUSTER_CONNECT_TIMEOUT_BEFORE,CLUSTER_CONNECT_TIMEOUT_AFTER);
241  } catch (Exception ex) {
242  // need to clean up if some connections succeeded
243  for (ClusterConnection connection: pooledConnections) {
244  connection.close();
245  }
246  pooledConnections.clear();
247  throw new ClusterJFatalUserException(
248  local.message("ERR_Connecting", props), ex);
249  }
250  this.pooledConnections.add(result);
251  return result;
252  }
253 
258  public Session getSession() {
259  return getSession(null);
260  }
261 
268  public Session getSession(Map properties) {
269  ClusterConnection clusterConnection = getClusterConnectionFromPool();
270  try {
271  Db db = null;
272  synchronized(this) {
273  checkConnection(clusterConnection);
274  db = clusterConnection.createDb(CLUSTER_DATABASE, CLUSTER_MAX_TRANSACTIONS);
275  }
276  Dictionary dictionary = db.getDictionary();
277  return new SessionImpl(this, properties, db, dictionary);
278  } catch (ClusterJException ex) {
279  throw ex;
280  } catch (Exception ex) {
281  throw new ClusterJFatalException(
282  local.message("ERR_Create_Ndb"), ex);
283  }
284  }
285 
286  private ClusterConnection getClusterConnectionFromPool() {
287  if (connectionPoolSize <= 1) {
288  return pooledConnections.get(0);
289  }
290  // find the best pooled connection (the connection with the least active sessions)
291  // this is not perfect without synchronization since a connection might close sessions
292  // after getting the dbCount but we don't care about perfection here.
293  ClusterConnection result = null;
294  int bestCount = Integer.MAX_VALUE;
295  for (ClusterConnection pooledConnection: pooledConnections ) {
296  int count = pooledConnection.dbCount();
297  if (count < bestCount) {
298  bestCount = count;
299  result = pooledConnection;
300  }
301  }
302  return result;
303  }
304 
305  private void checkConnection(ClusterConnection clusterConnection) {
306  if (clusterConnection == null) {
307  throw new ClusterJUserException(local.message("ERR_Session_Factory_Closed"));
308  }
309  }
310 
316  public static <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
317  // synchronize here because the map is not synchronized
318  synchronized(typeToHandlerMap) {
319  @SuppressWarnings( "unchecked" )
320  DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
321  return domainTypeHandler;
322  }
323  }
324 
332  public <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls,
333  Dictionary dictionary) {
334  // synchronize here because the map is not synchronized
335  synchronized(typeToHandlerMap) {
336  @SuppressWarnings("unchecked")
337  DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
338  if (logger.isDetailEnabled()) logger.detail("DomainTypeToHandler for "
339  + cls.getName() + "(" + cls
340  + ") returned " + domainTypeHandler);
341  if (domainTypeHandler == null) {
342  domainTypeHandler = domainTypeHandlerFactory.createDomainTypeHandler(cls,
343  dictionary);
344  if (logger.isDetailEnabled()) logger.detail("createDomainTypeHandler for "
345  + cls.getName() + "(" + cls
346  + ") returned " + domainTypeHandler);
347  typeToHandlerMap.put(cls, domainTypeHandler);
348  Class<?> proxyClass = domainTypeHandler.getProxyClass();
349  if (proxyClass != null) {
350  proxyClassToDomainClass.put(proxyClass, cls);
351  }
352  }
353  return domainTypeHandler;
354  }
355  }
356 
363  public <T> DomainTypeHandler<T> getDomainTypeHandler(T object, Dictionary dictionary) {
364  Class<T> cls = getClassForProxy(object);
365  DomainTypeHandler<T> result = getDomainTypeHandler(cls);
366  if (result != null) {
367  return result;
368  } else {
369  return getDomainTypeHandler(cls, dictionary);
370  }
371  }
372 
373  @SuppressWarnings("unchecked")
374  protected static <T> Class<T> getClassForProxy(T object) {
375  Class cls = object.getClass();
376  if (cls.getName().startsWith("$Proxy")) {
377  cls = proxyClassToDomainClass.get(cls);
378  }
379  return cls;
380  }
381 
382  public <T> T newInstance(Class<T> cls, Dictionary dictionary) {
383  DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls, dictionary);
384  return domainTypeHandler.newInstance();
385  }
386 
387  public Table getTable(String tableName, Dictionary dictionary) {
388  Table result;
389  try {
390  result = dictionary.getTable(tableName);
391  } catch(Exception ex) {
392  throw new ClusterJFatalInternalException(
393  local.message("ERR_Get_Table"), ex);
394  }
395  return result;
396  }
397 
403  protected static String getStringProperty(Map<?, ?> props, String propertyName) {
404  return (String)props.get(propertyName);
405  }
406 
414  protected static String getStringProperty(Map<?, ?> props, String propertyName, String defaultValue) {
415  String result = (String)props.get(propertyName);
416  if (result == null) {
417  result = defaultValue;
418  }
419  return result;
420  }
421 
428  protected static String getRequiredStringProperty(Map<?, ?> props, String propertyName) {
429  String result = (String)props.get(propertyName);
430  if (result == null) {
431  throw new ClusterJFatalUserException(
432  local.message("ERR_NullProperty", propertyName));
433  }
434  return result;
435  }
436 
444  protected static int getIntProperty(Map<?, ?> props, String propertyName, int defaultValue) {
445  Object property = props.get(propertyName);
446  if (property == null) {
447  return defaultValue;
448  }
449  if (Number.class.isAssignableFrom(property.getClass())) {
450  return ((Number)property).intValue();
451  }
452  if (property instanceof String) {
453  try {
454  int result = Integer.parseInt((String)property);
455  return result;
456  } catch (NumberFormatException ex) {
457  throw new ClusterJFatalUserException(
458  local.message("ERR_NumericFormat", propertyName, property));
459  }
460  }
461  throw new ClusterJUserException(local.message("ERR_NumericFormat", propertyName, property));
462  }
463 
464  public synchronized void close() {
465  // we have to close all of the cluster connections
466  for (ClusterConnection clusterConnection: pooledConnections) {
467  clusterConnection.close();
468  }
469  pooledConnections.clear();
470  synchronized(sessionFactoryMap) {
471  // now remove this from the map
472  sessionFactoryMap.remove(key);
473  }
474  }
475 
476  public void setDomainTypeHandlerFactory(DomainTypeHandlerFactory domainTypeHandlerFactory) {
477  this.domainTypeHandlerFactory = domainTypeHandlerFactory;
478  }
479 
480  public DomainTypeHandlerFactory getDomainTypeHandlerFactory() {
481  return domainTypeHandlerFactory;
482  }
483 
484  public List<Integer> getConnectionPoolSessionCounts() {
485  List<Integer> result = new ArrayList<Integer>();
486  for (ClusterConnection connection: pooledConnections) {
487  result.add(connection.dbCount());
488  }
489  return result;
490  }
491 
492 }