MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MemoryChannel.hpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef MemoryChannel_H
19 #define MemoryChannel_H
20 
21 //===========================================================================
22 //
23 // .DESCRIPTION
24 // Pointer based communication channel for communication between two
25 // thread. It does not copy any data in or out the channel so the
26 // item that is put in can not be used untill the other thread has
27 // given it back. There is no support for detecting the return of a
28 // item. The channel is half-duplex.
29 // For comminication between 1 writer and 1 reader use the MemoryChannel
30 // class, for comminication between multiple writer and 1 reader use the
31 // MemoryChannelMultipleWriter. There is no support for multiple readers.
32 //
33 // .TYPICAL USE:
34 // to communicate between threads.
35 //
36 // .EXAMPLE:
37 // See AsyncFile.C
38 //===========================================================================
39 //
40 //
41 // MemoryChannel( int size= 256);
42 // Constuctor
43 // Parameters:
44 // size : amount of pointer it can hold
45 //
46 // void operator ++ ();
47 // increments the index with one, if size is reached it is set to zero
48 //
49 // virtual void write( T *t);
50 // Puts the item in the channel if the channel is full an error is reported.
51 // Parameters:
52 // t: pointer to item to put in the channel, after this the item
53 // is shared with the other thread.
54 // errors
55 // AFS_ERROR_CHANNALFULL, channel is full
56 //
57 // T* read();
58 // Reads a itemn from the channel, if channel is empty it blocks untill
59 // an item can be read.
60 // return
61 // T : item from the channel
62 //
63 // T* tryRead();
64 // Reads a item from the channel, if channel is empty it returns zero.
65 // return
66 // T : item from the channel or zero if channel is empty.
67 //
68 
69 #include "NdbMutex.h"
70 #include "NdbCondition.h"
71 #include <NdbOut.hpp>
72 
73 
74 template <class T>
76 {
77 public:
78  MemoryChannel();
79  virtual ~MemoryChannel();
80 
81  void writeChannel(T *t);
82  void writeChannelNoSignal(T *t);
83  T* readChannel();
84  T* tryReadChannel();
85 
89  struct ListMember
90  {
91  T* m_next;
92  };
93 
94 private:
95  Uint32 m_occupancy;
96  T* m_head; // First element in list (e.g will be read by readChannel)
97  T* m_tail;
98  NdbMutex* theMutexPtr;
99  NdbCondition* theConditionPtr;
100 
101  template<class U>
102  friend NdbOut& operator<<(NdbOut& out, const MemoryChannel<U> & chn);
103 };
104 
105 template <class T>
106 NdbOut& operator<<(NdbOut& out, const MemoryChannel<T> & chn)
107 {
108  NdbMutex_Lock(chn.theMutexPtr);
109  out << "[ occupancy: " << chn.m_occupancy
110  << " ]";
111  NdbMutex_Unlock(chn.theMutexPtr);
112  return out;
113 }
114 
115 template <class T> MemoryChannel<T>::MemoryChannel() :
116  m_occupancy(0), m_head(0), m_tail(0)
117 {
118  theMutexPtr = NdbMutex_Create();
119  theConditionPtr = NdbCondition_Create();
120 }
121 
122 template <class T> MemoryChannel<T>::~MemoryChannel( )
123 {
124  NdbMutex_Destroy(theMutexPtr);
125  NdbCondition_Destroy(theConditionPtr);
126 }
127 
128 template <class T> void MemoryChannel<T>::writeChannel( T *t)
129 {
130  writeChannelNoSignal(t);
131  NdbCondition_Signal(theConditionPtr);
132 }
133 
134 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
135 {
136  NdbMutex_Lock(theMutexPtr);
137  if (m_head == 0)
138  {
139  assert(m_occupancy == 0);
140  m_head = m_tail = t;
141  }
142  else
143  {
144  assert(m_tail != 0);
145  m_tail->m_mem_channel.m_next = t;
146  m_tail = t;
147  }
148  t->m_mem_channel.m_next = 0;
149  m_occupancy++;
150  NdbMutex_Unlock(theMutexPtr);
151 }
152 
153 template <class T> T* MemoryChannel<T>::readChannel()
154 {
155  NdbMutex_Lock(theMutexPtr);
156  while (m_head == 0)
157  {
158  assert(m_occupancy == 0);
159  NdbCondition_Wait(theConditionPtr,
160  theMutexPtr);
161  }
162  assert(m_occupancy > 0);
163  T* tmp = m_head;
164  if (m_head == m_tail)
165  {
166  assert(m_occupancy == 1);
167  m_head = m_tail = 0;
168  }
169  else
170  {
171  m_head = m_head->m_mem_channel.m_next;
172  }
173  m_occupancy--;
174  NdbMutex_Unlock(theMutexPtr);
175  return tmp;
176 }
177 
178 template <class T> T* MemoryChannel<T>::tryReadChannel()
179 {
180  NdbMutex_Lock(theMutexPtr);
181  T* tmp = m_head;
182  if (m_head != 0)
183  {
184  assert(m_occupancy > 0);
185  if (m_head == m_tail)
186  {
187  assert(m_occupancy == 1);
188  m_head = m_tail = 0;
189  }
190  else
191  {
192  m_head = m_head->m_mem_channel.m_next;
193  }
194  m_occupancy--;
195  }
196  else
197  {
198  assert(m_occupancy == 0);
199  }
200  NdbMutex_Unlock(theMutexPtr);
201  return tmp;
202 }
203 
204 #endif // MemoryChannel_H
205