MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PgmanProxy.cpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include "PgmanProxy.hpp"
17 #include "pgman.hpp"
18 #include <signaldata/DataFileOrd.hpp>
19 
20 PgmanProxy::PgmanProxy(Block_context& ctx) :
21  LocalProxy(PGMAN, ctx)
22 {
23  c_extraWorkers = 1;
24 
25  // GSN_LCP_FRAG_ORD
26  addRecSignal(GSN_LCP_FRAG_ORD, &PgmanProxy::execLCP_FRAG_ORD);
27 
28  // GSN_END_LCP_REQ
29  addRecSignal(GSN_END_LCP_REQ, &PgmanProxy::execEND_LCP_REQ);
30  addRecSignal(GSN_END_LCP_CONF, &PgmanProxy::execEND_LCP_CONF);
31  addRecSignal(GSN_RELEASE_PAGES_CONF, &PgmanProxy::execRELEASE_PAGES_CONF);
32 }
33 
34 PgmanProxy::~PgmanProxy()
35 {
36 }
37 
39 PgmanProxy::newWorker(Uint32 instanceNo)
40 {
41  return new Pgman(m_ctx, instanceNo);
42 }
43 
44 // GSN_LCP_FRAG_ORD
45 
46 void
47 PgmanProxy::execLCP_FRAG_ORD(Signal* signal)
48 {
49  const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
50  Uint32 ssId = getSsId(req);
51  Ss_LCP_FRAG_ORD& ss = ssSeize<Ss_LCP_FRAG_ORD>(ssId);
52  ss.m_req = *req;
53  sendREQ(signal, ss);
54  ssRelease<Ss_LCP_FRAG_ORD>(ssId);
55 }
56 
57 void
58 PgmanProxy::sendLCP_FRAG_ORD(Signal* signal, Uint32 ssId, SectionHandle* handle)
59 {
60  Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
61  LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
62  *req = ss.m_req;
63  sendSignalNoRelease(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
64  signal, LcpFragOrd::SignalLength, JBB, handle);
65 }
66 
67 // GSN_END_LCP_REQ
68 
69 void
70 PgmanProxy::execEND_LCP_REQ(Signal* signal)
71 {
72  const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
73  Uint32 ssId = getSsId(req);
74  Ss_END_LCP_REQ& ss = ssSeize<Ss_END_LCP_REQ>(ssId);
75  ss.m_req = *req;
76 
77  const Uint32 sb = refToBlock(ss.m_req.senderRef);
78  ndbrequire(sb == DBLQH || sb == LGMAN);
79 
80  if (sb == LGMAN) {
81  jam();
82  /*
83  * At end of UNDO execution. Extra PGMAN worker was used to
84  * read up TUP pages. Release these pages now.
85  */
86  ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
87  req->senderData = ssId;
88  req->senderRef = reference();
89  req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
90  req->requestData = 0;
91  sendSignal(extraWorkerRef(), GSN_RELEASE_PAGES_REQ,
92  signal, ReleasePagesReq::SignalLength, JBB);
93  return;
94  }
95  sendREQ(signal, ss);
96 }
97 
98 void
99 PgmanProxy::execRELEASE_PAGES_CONF(Signal* signal)
100 {
101  const ReleasePagesConf* conf = (const ReleasePagesConf*)signal->getDataPtr();
102  Uint32 ssId = getSsId(conf);
103  Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
104  sendREQ(signal, ss);
105 }
106 
107 void
108 PgmanProxy::sendEND_LCP_REQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
109 {
110  Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
111 
112  EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
113  *req = ss.m_req;
114  req->senderData = ssId;
115  req->senderRef = reference();
116  sendSignalNoRelease(workerRef(ss.m_worker), GSN_END_LCP_REQ,
117  signal, EndLcpReq::SignalLength, JBB, handle);
118 }
119 
120 void
121 PgmanProxy::execEND_LCP_CONF(Signal* signal)
122 {
123  const EndLcpConf* conf = (EndLcpConf*)signal->getDataPtr();
124  Uint32 ssId = conf->senderData;
125  Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
126  recvCONF(signal, ss);
127 }
128 
129 void
130 PgmanProxy::sendEND_LCP_CONF(Signal* signal, Uint32 ssId)
131 {
132  Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
133  BlockReference senderRef = ss.m_req.senderRef;
134 
135  if (!lastReply(ss)) {
136  jam();
137  return;
138  }
139 
140  if (!lastExtra(signal, ss)) {
141  jam();
142  return;
143  }
144 
145  if (ss.m_error == 0) {
146  jam();
147  EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
148  conf->senderData = ss.m_req.senderData;
149  conf->senderRef = reference();
150  sendSignal(senderRef, GSN_END_LCP_CONF,
151  signal, EndLcpConf::SignalLength, JBB);
152  } else {
153  ndbrequire(false);
154  }
155 
156  ssRelease<Ss_END_LCP_REQ>(ssId);
157 }
158 
159 // client methods
160 
161 /*
162  * Here caller must have instance 0. The extra worker in our
163  * thread is used. These are extent pages.
164  */
165 
166 int
168  Signal* signal,
169  Page_cache_client::Request& req, Uint32 flags)
170 {
171  ndbrequire(blockToInstance(caller.m_block) == 0);
172  SimulatedBlock* block = globalData.getBlock(caller.m_block);
173  Pgman* worker = (Pgman*)extraWorkerBlock();
174  Page_cache_client pgman(block, worker);
175  int ret = pgman.get_page(signal, req, flags);
176  caller.m_ptr = pgman.m_ptr;
177  return ret;
178 }
179 
180 void
181 PgmanProxy::update_lsn(Page_cache_client& caller,
182  Local_key key, Uint64 lsn)
183 {
184  ndbrequire(blockToInstance(caller.m_block) == 0);
185  SimulatedBlock* block = globalData.getBlock(caller.m_block);
186  Pgman* worker = (Pgman*)extraWorkerBlock();
187  Page_cache_client pgman(block, worker);
188  pgman.update_lsn(key, lsn);
189 }
190 
191 int
193  Local_key key, Uint32 page_id)
194 {
195  ndbrequire(blockToInstance(caller.m_block) == 0);
196  SimulatedBlock* block = globalData.getBlock(caller.m_block);
197  Pgman* worker = (Pgman*)extraWorkerBlock();
198  Page_cache_client pgman(block, worker);
199  int ret = pgman.drop_page(key, page_id);
200  return ret;
201 }
202 
203 /*
204  * Following contact all workers. First the method is called
205  * on extra worker. Then DATA_FILE_ORD is sent to LQH workers.
206  * The result must be same since configurations are identical.
207  */
208 
209 Uint32
211 {
212  Pgman* worker = (Pgman*)extraWorkerBlock();
213  Uint32 ret = worker->create_data_file();
214  Uint32 i;
215  for (i = 0; i < c_lqhWorkers; i++) {
216  jam();
217  send_data_file_ord(signal, i, ret,
218  DataFileOrd::CreateDataFile);
219  }
220  return ret;
221 }
222 
223 Uint32
224 PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no)
225 {
226  Pgman* worker = (Pgman*)extraWorkerBlock();
227  Uint32 ret = worker->alloc_data_file(file_no);
228  Uint32 i;
229  for (i = 0; i < c_lqhWorkers; i++) {
230  jam();
231  send_data_file_ord(signal, i, ret,
232  DataFileOrd::AllocDataFile, file_no);
233  }
234  return ret;
235 }
236 
237 void
238 PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
239 {
240  Pgman* worker = (Pgman*)extraWorkerBlock();
241  worker->map_file_no(file_no, fd);
242  Uint32 i;
243  for (i = 0; i < c_lqhWorkers; i++) {
244  jam();
245  send_data_file_ord(signal, i, ~(Uint32)0,
246  DataFileOrd::MapFileNo, file_no, fd);
247  }
248 }
249 
250 void
251 PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
252 {
253  Pgman* worker = (Pgman*)extraWorkerBlock();
254  worker->free_data_file(file_no, fd);
255  Uint32 i;
256  for (i = 0; i < c_lqhWorkers; i++) {
257  jam();
258  send_data_file_ord(signal, i, ~(Uint32)0,
259  DataFileOrd::FreeDataFile, file_no, fd);
260  }
261 }
262 
263 void
264 PgmanProxy::send_data_file_ord(Signal* signal, Uint32 i, Uint32 ret,
265  Uint32 cmd, Uint32 file_no, Uint32 fd)
266 {
267  DataFileOrd* ord = (DataFileOrd*)signal->getDataPtrSend();
268  ord->ret = ret;
269  ord->cmd = cmd;
270  ord->file_no = file_no;
271  ord->fd = fd;
272  sendSignal(workerRef(i), GSN_DATA_FILE_ORD,
273  signal, DataFileOrd::SignalLength, JBB);
274 }
275 
276 BLOCK_FUNCTIONS(PgmanProxy)