MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SHM_Buffer.hpp
1 /*
2  Copyright (C) 2003-2006, 2008 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #ifndef SHM_BUFFER_HPP
20 #define SHM_BUFFER_HPP
21 
22 #include <ndb_global.h>
23 
24 #include <NdbSleep.h>
25 
41 class SHM_Reader {
42 public:
43  SHM_Reader(char * const _startOfBuffer,
44  Uint32 _sizeOfBuffer,
45  Uint32 _slack,
46  Uint32 * _readIndex,
47  Uint32 * _writeIndex) :
48  m_startOfBuffer(_startOfBuffer),
49  m_totalBufferSize(_sizeOfBuffer),
50  m_bufferSize(_sizeOfBuffer - _slack),
51  m_sharedReadIndex(_readIndex),
52  m_sharedWriteIndex(_writeIndex)
53  {
54  }
55 
56  void clear() {
57  m_readIndex = 0;
58  }
59 
63  inline bool empty() const;
64 
71  inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
72 
76  inline void updateReadPtr(Uint32 *ptr);
77 
78 private:
79  char * const m_startOfBuffer;
80  Uint32 m_totalBufferSize;
81  Uint32 m_bufferSize;
82  Uint32 m_readIndex;
83 
84  Uint32 * m_sharedReadIndex;
85  Uint32 * m_sharedWriteIndex;
86 };
87 
88 inline
89 bool
90 SHM_Reader::empty() const{
91  bool ret = (m_readIndex == * m_sharedWriteIndex);
92  return ret;
93 }
94 
101 inline
102 void
103 SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod)
104 {
105  Uint32 tReadIndex = m_readIndex;
106  Uint32 tWriteIndex = * m_sharedWriteIndex;
107 
108  ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
109 
110  if(tReadIndex <= tWriteIndex){
111  eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
112  } else {
113  eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
114  }
115 }
116 
120 inline
121 void
123 {
124  Uint32 tReadIndex = ((char*)ptr) - m_startOfBuffer;
125 
126  assert(tReadIndex < m_totalBufferSize);
127 
128  if(tReadIndex >= m_bufferSize){
129  tReadIndex = 0;
130  }
131 
132  m_readIndex = tReadIndex;
133  * m_sharedReadIndex = tReadIndex;
134 }
135 
136 #define WRITER_SLACK 4
137 
138 class SHM_Writer {
139 public:
140  SHM_Writer(char * const _startOfBuffer,
141  Uint32 _sizeOfBuffer,
142  Uint32 _slack,
143  Uint32 * _readIndex,
144  Uint32 * _writeIndex) :
145  m_startOfBuffer(_startOfBuffer),
146  m_totalBufferSize(_sizeOfBuffer),
147  m_bufferSize(_sizeOfBuffer - _slack),
148  m_sharedReadIndex(_readIndex),
149  m_sharedWriteIndex(_writeIndex)
150  {
151  }
152 
153  void clear() {
154  m_writeIndex = 0;
155  }
156 
157  inline char * getWritePtr(Uint32 sz);
158  inline void updateWritePtr(Uint32 sz);
159 
160  inline Uint32 getWriteIndex() const { return m_writeIndex;}
161  inline Uint32 getBufferSize() const { return m_bufferSize;}
162  inline Uint32 get_free_buffer() const;
163 
164  inline void copyIndexes(SHM_Writer * standbyWriter);
165 
166  /* Write struct iovec into buffer. */
167  inline Uint32 writev(const struct iovec *vec, int count);
168 
169 private:
170  char * const m_startOfBuffer;
171  Uint32 m_totalBufferSize;
172  Uint32 m_bufferSize;
173 
174  Uint32 m_writeIndex;
175 
176  Uint32 * m_sharedReadIndex;
177  Uint32 * m_sharedWriteIndex;
178 };
179 
180 inline
181 char *
182 SHM_Writer::getWritePtr(Uint32 sz){
183  Uint32 tReadIndex = * m_sharedReadIndex;
184  Uint32 tWriteIndex = m_writeIndex;
185 
186  char * ptr = &m_startOfBuffer[tWriteIndex];
187 
188  Uint32 free;
189  if(tReadIndex <= tWriteIndex){
190  free = m_bufferSize + tReadIndex - tWriteIndex;
191  } else {
192  free = tReadIndex - tWriteIndex;
193  }
194 
195  sz += 4;
196  if(sz < free){
197  return ptr;
198  }
199 
200  return 0;
201 }
202 
203 inline
204 void
205 SHM_Writer::updateWritePtr(Uint32 sz){
206 
207  assert(m_writeIndex == * m_sharedWriteIndex);
208 
209  Uint32 tWriteIndex = m_writeIndex;
210  tWriteIndex += sz;
211 
212  assert(tWriteIndex < m_totalBufferSize);
213 
214  if(tWriteIndex >= m_bufferSize){
215  tWriteIndex = 0;
216  }
217 
218  m_writeIndex = tWriteIndex;
219  * m_sharedWriteIndex = tWriteIndex;
220 }
221 
222 inline
223 Uint32
224 SHM_Writer::get_free_buffer() const
225 {
226  Uint32 tReadIndex = * m_sharedReadIndex;
227  Uint32 tWriteIndex = m_writeIndex;
228 
229  Uint32 free;
230  if(tReadIndex <= tWriteIndex){
231  free = m_bufferSize + tReadIndex - tWriteIndex;
232  } else {
233  free = tReadIndex - tWriteIndex;
234  }
235  return free;
236 }
237 
238 inline
239 Uint32
240 SHM_Writer::writev(const struct iovec *vec, int count)
241 {
242  Uint32 tReadIndex = * m_sharedReadIndex;
243  Uint32 tWriteIndex = m_writeIndex;
244 
257  Uint32 total = 0;
258  for (int i = 0; i < count; i++)
259  {
260  unsigned char *ptr = (unsigned char *)vec[i].iov_base;
261  Uint32 remain = vec[i].iov_len;
262  Uint32 segment;
263  Uint32 maxBytes;
264  if (tReadIndex <= tWriteIndex)
265  {
266  /* Free buffer is split in two. */
267  if (tWriteIndex + remain > m_bufferSize)
268  maxBytes = (m_bufferSize - tWriteIndex)/4;
269  else
270  maxBytes = remain/4;
271  segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
272  maxBytes/4);
273  if (segment > 0)
274  memcpy(m_startOfBuffer + tWriteIndex, ptr, segment);
275  remain -= segment;
276  total += segment;
277  ptr += segment;
278  tWriteIndex = 0;
279  if (remain > 0)
280  {
281  if (remain > tReadIndex)
282  maxBytes = tReadIndex;
283  else
284  maxBytes = remain;
285  segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
286  maxBytes/4);
287  if (segment > 0)
288  memcpy(m_startOfBuffer, ptr, segment);
289  total += segment;
290  tWriteIndex = segment;
291  if (remain > segment)
292  break; // No more room
293  }
294  }
295  else
296  {
297  if (tWriteIndex + remain > tReadIndex)
298  maxBytes = tReadIndex - tWriteIndex;
299  else
300  maxBytes = remain;
301  segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
302  maxBytes/4);
303  if (segment > 0)
304  memcpy(m_startOfBuffer + tWriteIndex, ptr, segment);
305  total += segment;
306  tWriteIndex += segment;
307  if (remain > segment)
308  break; // No more room
309  }
310  }
311 
312  m_writeIndex = tWriteIndex;
313  *m_sharedWriteIndex = tWriteIndex;
314 
315  return total;
316 }
317 
318 #endif