MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rt_split.c
1 /* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 #include "myisamdef.h"
17 
18 #ifdef HAVE_RTREE_KEYS
19 
20 #include "rt_index.h"
21 #include "rt_key.h"
22 #include "rt_mbr.h"
23 
24 typedef struct
25 {
26  double square;
27  int n_node;
28  uchar *key;
29  double *coords;
30 } SplitStruct;
31 
32 inline static double *reserve_coords(double **d_buffer, int n_dim)
33 {
34  double *coords = *d_buffer;
35  (*d_buffer) += n_dim * 2;
36  return coords;
37 }
38 
39 static void mbr_join(double *a, const double *b, int n_dim)
40 {
41  double *end = a + n_dim * 2;
42  do
43  {
44  if (a[0] > b[0])
45  a[0] = b[0];
46 
47  if (a[1] < b[1])
48  a[1] = b[1];
49 
50  a += 2;
51  b += 2;
52  }while (a != end);
53 }
54 
55 /*
56 Counts the square of mbr which is a join of a and b
57 */
58 static double mbr_join_square(const double *a, const double *b, int n_dim)
59 {
60  const double *end = a + n_dim * 2;
61  double square = 1.0;
62  do
63  {
64  square *=
65  ((a[1] < b[1]) ? b[1] : a[1]) - ((a[0] > b[0]) ? b[0] : a[0]);
66 
67  a += 2;
68  b += 2;
69  }while (a != end);
70 
71  return square;
72 }
73 
74 static double count_square(const double *a, int n_dim)
75 {
76  const double *end = a + n_dim * 2;
77  double square = 1.0;
78  do
79  {
80  square *= a[1] - a[0];
81  a += 2;
82  }while (a != end);
83  return square;
84 }
85 
86 inline static void copy_coords(double *dst, const double *src, int n_dim)
87 {
88  memcpy(dst, src, sizeof(double) * (n_dim * 2));
89 }
90 
91 /*
92 Select two nodes to collect group upon
93 */
94 static void pick_seeds(SplitStruct *node, int n_entries,
95  SplitStruct **seed_a, SplitStruct **seed_b, int n_dim)
96 {
97  SplitStruct *cur1;
98  SplitStruct *lim1 = node + (n_entries - 1);
99  SplitStruct *cur2;
100  SplitStruct *lim2 = node + n_entries;
101 
102  double max_d = -DBL_MAX;
103  double d;
104 
105  for (cur1 = node; cur1 < lim1; ++cur1)
106  {
107  for (cur2=cur1 + 1; cur2 < lim2; ++cur2)
108  {
109 
110  d = mbr_join_square(cur1->coords, cur2->coords, n_dim) - cur1->square -
111  cur2->square;
112  if (d > max_d)
113  {
114  max_d = d;
115  *seed_a = cur1;
116  *seed_b = cur2;
117  }
118  }
119  }
120 }
121 
122 /*
123 Select next node and group where to add
124 */
125 static void pick_next(SplitStruct *node, int n_entries, double *g1, double *g2,
126  SplitStruct **choice, int *n_group, int n_dim)
127 {
128  SplitStruct *cur = node;
129  SplitStruct *end = node + n_entries;
130 
131  double max_diff = -DBL_MAX;
132 
133  for (; cur<end; ++cur)
134  {
135  double diff;
136  double abs_diff;
137 
138  if (cur->n_node)
139  {
140  continue;
141  }
142 
143  diff = mbr_join_square(g1, cur->coords, n_dim) -
144  mbr_join_square(g2, cur->coords, n_dim);
145 
146  abs_diff = fabs(diff);
147  if (abs_diff > max_diff)
148  {
149  max_diff = abs_diff;
150  *n_group = 1 + (diff > 0);
151  *choice = cur;
152  }
153  }
154 }
155 
156 /*
157 Mark not-in-group entries as n_group
158 */
159 static void mark_all_entries(SplitStruct *node, int n_entries, int n_group)
160 {
161  SplitStruct *cur = node;
162  SplitStruct *end = node + n_entries;
163  for (; cur<end; ++cur)
164  {
165  if (cur->n_node)
166  {
167  continue;
168  }
169  cur->n_node = n_group;
170  }
171 }
172 
173 static int split_rtree_node(SplitStruct *node, int n_entries,
174  int all_size, /* Total key's size */
175  int key_size,
176  int min_size, /* Minimal group size */
177  int size1, int size2 /* initial group sizes */,
178  double **d_buffer, int n_dim)
179 {
180  SplitStruct *cur;
181  SplitStruct *UNINIT_VAR(a), *UNINIT_VAR(b);
182  double *g1 = reserve_coords(d_buffer, n_dim);
183  double *g2 = reserve_coords(d_buffer, n_dim);
184  SplitStruct *UNINIT_VAR(next);
185  int UNINIT_VAR(next_node);
186  int i;
187  SplitStruct *end = node + n_entries;
188 
189  if (all_size < min_size * 2)
190  {
191  return 1;
192  }
193 
194  cur = node;
195  for (; cur<end; ++cur)
196  {
197  cur->square = count_square(cur->coords, n_dim);
198  cur->n_node = 0;
199  }
200 
201  pick_seeds(node, n_entries, &a, &b, n_dim);
202  a->n_node = 1;
203  b->n_node = 2;
204 
205 
206  copy_coords(g1, a->coords, n_dim);
207  size1 += key_size;
208  copy_coords(g2, b->coords, n_dim);
209  size2 += key_size;
210 
211 
212  for (i=n_entries - 2; i>0; --i)
213  {
214  if (all_size - (size2 + key_size) < min_size) /* Can't write into group 2 */
215  {
216  mark_all_entries(node, n_entries, 1);
217  break;
218  }
219 
220  if (all_size - (size1 + key_size) < min_size) /* Can't write into group 1 */
221  {
222  mark_all_entries(node, n_entries, 2);
223  break;
224  }
225 
226  pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim);
227  if (next_node == 1)
228  {
229  size1 += key_size;
230  mbr_join(g1, next->coords, n_dim);
231  }
232  else
233  {
234  size2 += key_size;
235  mbr_join(g2, next->coords, n_dim);
236  }
237  next->n_node = next_node;
238  }
239 
240  return 0;
241 }
242 
243 int rtree_split_page(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *page, uchar *key,
244  uint key_length, my_off_t *new_page_offs)
245 {
246  int n1, n2; /* Number of items in groups */
247 
248  SplitStruct *task;
249  SplitStruct *cur;
250  SplitStruct *stop;
251  double *coord_buf;
252  double *next_coord;
253  int n_dim;
254  uchar *source_cur, *cur1, *cur2;
255  uchar *new_page= info->buff;
256  int err_code= 0;
257  uint nod_flag= mi_test_if_nod(page);
258  uint full_length= key_length + (nod_flag ? nod_flag :
259  info->s->base.rec_reflength);
260  int max_keys= (mi_getint(page)-2) / (full_length);
261  DBUG_ENTER("rtree_split_page");
262  DBUG_PRINT("rtree", ("splitting block"));
263 
264  n_dim = keyinfo->keysegs / 2;
265 
266  if (!(coord_buf= (double*) my_alloca(n_dim * 2 * sizeof(double) *
267  (max_keys + 1 + 4) +
268  sizeof(SplitStruct) * (max_keys + 1))))
269  DBUG_RETURN(-1); /* purecov: inspected */
270 
271  task= (SplitStruct *)(coord_buf + n_dim * 2 * (max_keys + 1 + 4));
272 
273  next_coord = coord_buf;
274 
275  stop = task + max_keys;
276  source_cur = rt_PAGE_FIRST_KEY(page, nod_flag);
277 
278  for (cur = task; cur < stop; ++cur, source_cur = rt_PAGE_NEXT_KEY(source_cur,
279  key_length, nod_flag))
280  {
281  cur->coords = reserve_coords(&next_coord, n_dim);
282  cur->key = source_cur;
283  rtree_d_mbr(keyinfo->seg, source_cur, key_length, cur->coords);
284  }
285 
286  cur->coords = reserve_coords(&next_coord, n_dim);
287  rtree_d_mbr(keyinfo->seg, key, key_length, cur->coords);
288  cur->key = key;
289 
290  if (split_rtree_node(task, max_keys + 1,
291  mi_getint(page) + full_length + 2, full_length,
292  rt_PAGE_MIN_SIZE(keyinfo->block_length),
293  2, 2, &next_coord, n_dim))
294  {
295  err_code = 1;
296  goto split_err;
297  }
298 
299  info->buff_used= 1;
300  stop = task + (max_keys + 1);
301  cur1 = rt_PAGE_FIRST_KEY(page, nod_flag);
302  cur2 = rt_PAGE_FIRST_KEY(new_page, nod_flag);
303 
304  n1= n2 = 0;
305  for (cur = task; cur < stop; ++cur)
306  {
307  uchar *to;
308  if (cur->n_node == 1)
309  {
310  to = cur1;
311  cur1 = rt_PAGE_NEXT_KEY(cur1, key_length, nod_flag);
312  ++n1;
313  }
314  else
315  {
316  to = cur2;
317  cur2 = rt_PAGE_NEXT_KEY(cur2, key_length, nod_flag);
318  ++n2;
319  }
320  if (to != cur->key)
321  memcpy(to - nod_flag, cur->key - nod_flag, full_length);
322  }
323 
324  mi_putint(page, 2 + n1 * full_length, nod_flag);
325  mi_putint(new_page, 2 + n2 * full_length, nod_flag);
326 
327  if ((*new_page_offs= _mi_new(info, keyinfo, DFLT_INIT_HITS)) ==
328  HA_OFFSET_ERROR)
329  err_code= -1;
330  else
331  err_code= _mi_write_keypage(info, keyinfo, *new_page_offs,
332  DFLT_INIT_HITS, new_page);
333  DBUG_PRINT("rtree", ("split new block: %lu", (ulong) *new_page_offs));
334 
335 split_err:
336  my_afree((uchar*) coord_buf);
337  DBUG_RETURN(err_code);
338 }
339 
340 #endif /*HAVE_RTREE_KEYS*/