MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
trp_client.cpp
1 /*
2  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "trp_client.hpp"
19 #include "TransporterFacade.hpp"
20 
21 trp_client::trp_client()
22  : m_blockNo(~Uint32(0)), m_facade(0)
23 {
24  m_poll.m_waiting = false;
25  m_poll.m_locked = false;
26  m_poll.m_poll_owner = false;
27  m_poll.m_next = 0;
28  m_poll.m_prev = 0;
29  m_poll.m_condition = NdbCondition_Create();
30 }
31 
33 {
38  assert(m_poll.m_locked == 0);
39  assert(m_poll.m_poll_owner == false);
40  assert(m_poll.m_next == 0);
41  assert(m_poll.m_prev == 0);
42 
43  close();
44  NdbCondition_Destroy(m_poll.m_condition);
45 }
46 
47 Uint32
48 trp_client::open(TransporterFacade* tf, int blockNo)
49 {
50  Uint32 res = 0;
51  assert(m_facade == 0);
52  if (m_facade == 0)
53  {
54  m_facade = tf;
55  res = tf->open_clnt(this, blockNo);
56  if (res != 0)
57  {
58  m_blockNo = refToBlock(res);
59  }
60  else
61  {
62  m_facade = 0;
63  }
64  }
65  return res;
66 }
67 
68 Uint32
69 trp_client::getOwnNodeId() const
70 {
71  return m_facade->theOwnId;
72 }
73 
74 void
75 trp_client::close()
76 {
77  if (m_facade)
78  {
79  m_facade->close_clnt(this);
80 
81  m_facade = 0;
82  m_blockNo = ~Uint32(0);
83  }
84 }
85 
86 void
87 trp_client::start_poll()
88 {
89  m_facade->start_poll(this);
90 }
91 
92 void
93 trp_client::do_poll(Uint32 to)
94 {
95  m_facade->do_poll(this, to);
96 }
97 
98 void
99 trp_client::complete_poll()
100 {
101  m_facade->complete_poll(this);
102 }
103 
104 void
105 trp_client::do_forceSend(int val)
106 {
107  if (val == 0)
108  {
109  m_facade->checkForceSend(m_blockNo);
110  }
111  else if (val == 1)
112  {
113  m_facade->forceSend(m_blockNo);
114  }
115 }
116 
117 int
118 trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
119 {
120  return m_facade->m_poll_owner->raw_sendSignal(signal, nodeId);
121 }
122 
123 #include "NdbImpl.hpp"
124 
125 PollGuard::PollGuard(NdbImpl& impl)
126 {
127  m_clnt = &impl;
128  m_waiter= &impl.theWaiter;
129  m_clnt->start_poll();
130 }
131 
132 /*
133  This is a common routine for possibly forcing the send of buffered signals
134  and receiving response the thread is waiting for. It is designed to be
135  useful from:
136  1) PK, UK lookups using the asynchronous interface
137  This routine uses the wait_for_input routine instead since it has
138  special end conditions due to the asynchronous nature of its usage.
139  2) Scans
140  3) dictSignal
141  It uses a NdbWaiter object to wait on the events and this object is
142  linked into the conditional wait queue. Thus this object contains
143  a reference to its place in the queue.
144 
145  It replaces the method receiveResponse previously used on the Ndb object
146 */
147 int PollGuard::wait_n_unlock(int wait_time, Uint32 nodeId, Uint32 state,
148  bool forceSend)
149 {
150  int ret_val;
151  m_waiter->set_node(nodeId);
152  m_waiter->set_state(state);
153  ret_val= wait_for_input_in_loop(wait_time, forceSend);
154  unlock_and_signal();
155  return ret_val;
156 }
157 
158 int PollGuard::wait_scan(int wait_time, Uint32 nodeId, bool forceSend)
159 {
160  m_waiter->set_node(nodeId);
161  m_waiter->set_state(WAIT_SCAN);
162  return wait_for_input_in_loop(wait_time, forceSend);
163 }
164 
165 int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend)
166 {
167  int ret_val;
168  m_clnt->do_forceSend(forceSend ? 1 : 0);
169 
170  NDB_TICKS curr_time = NdbTick_CurrentNanosecond();
171  /* Use nanosecond wait_time for max_time calculation */
172  NDB_TICKS max_time = curr_time + ((NDB_TICKS)wait_time * 1000000);
173  const int maxsleep = (wait_time == -1 || wait_time > 10) ? 10 : wait_time;
174  do
175  {
176  wait_for_input(maxsleep);
177  NDB_TICKS start_time_nanos = curr_time;
178  curr_time = NdbTick_CurrentNanosecond();
179  m_clnt->recordWaitTimeNanos(curr_time - start_time_nanos);
180  Uint32 state= m_waiter->get_state();
181  if (state == NO_WAIT)
182  {
183  return 0;
184  }
185  else if (state == WAIT_NODE_FAILURE)
186  {
187  ret_val= -2;
188  break;
189  }
190  if (wait_time == -1)
191  {
192 #ifdef NOT_USED
193  ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl;
194 #endif
195  continue;
196  }
197  if (curr_time >= max_time)
198  {
199 #ifdef VM_TRACE
200  ndbout << "Time-out state is " << m_waiter->get_state() << endl;
201 #endif
202  m_waiter->set_state(WST_WAIT_TIMEOUT);
203  ret_val= -1;
204  break;
205  }
206  } while (1);
207 #ifdef VM_TRACE
208  ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
209  ndbout << m_waiter->get_state() << endl;
210 #endif
211  m_waiter->set_state(NO_WAIT);
212  return ret_val;
213 }
214 
215 void PollGuard::wait_for_input(int wait_time)
216 {
217  m_clnt->do_poll(wait_time);
218 }
219 
220 void PollGuard::unlock_and_signal()
221 {
222  m_clnt->complete_poll();
223 }