Line data Source code
1 :
2 : /* Copyright (c) 2006-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "barrier.h"
23 :
24 : #include "barrierCommand.h"
25 : #include "connection.h"
26 : #include "dataIStream.h"
27 : #include "dataOStream.h"
28 : #include "exception.h"
29 : #include "global.h"
30 : #include "iCommand.h"
31 : #include "log.h"
32 : #include "objectICommand.h"
33 : #include "objectOCommand.h"
34 :
35 : #include <lunchbox/monitor.h>
36 :
37 : #include <unordered_map>
38 :
39 : namespace co
40 : {
41 : namespace
42 : {
43 109 : struct Request
44 : {
45 109 : Request()
46 109 : : time(0)
47 : , timeout(LB_TIMEOUT_INDEFINITE)
48 109 : , incarnation(0)
49 : {
50 109 : }
51 : uint64_t time;
52 : uint32_t timeout;
53 : uint32_t incarnation;
54 : Nodes nodes;
55 : };
56 :
57 : typedef std::unordered_map<uint128_t, Request> RequestMap;
58 : typedef RequestMap::iterator RequestMapIter;
59 : }
60 :
61 : namespace detail
62 : {
63 31 : class Barrier
64 : {
65 : public:
66 26 : Barrier()
67 26 : : height(0)
68 : {
69 26 : }
70 5 : Barrier(const uint128_t& masterID_, const uint32_t height_)
71 5 : : masterID(masterID_)
72 5 : , height(height_)
73 : {
74 5 : }
75 :
76 : /** The master barrier node. */
77 : NodeID masterID;
78 :
79 : /** The height of the barrier, only set on the master. */
80 : uint32_t height;
81 :
82 : /** The local, connected instantiation of the master node. */
83 : NodePtr master;
84 :
85 : /** Slave nodes which have entered the barrier, index per version. */
86 : RequestMap enteredNodes;
87 :
88 : /** The monitor used for barrier leave notification. */
89 : lunchbox::Monitor<uint32_t> incarnation;
90 : };
91 : }
92 :
93 : typedef CommandFunc<Barrier> CmdFunc;
94 :
95 5 : Barrier::Barrier(LocalNodePtr localNode, const uint128_t& masterNodeID,
96 5 : const uint32_t height)
97 5 : : _impl(new detail::Barrier(masterNodeID.isUUID() ? masterNodeID
98 0 : : localNode->getNodeID(),
99 10 : height))
100 : {
101 5 : localNode->registerObject(this);
102 5 : }
103 :
104 26 : Barrier::Barrier(LocalNodePtr localNode, const ObjectVersion& id)
105 26 : : _impl(new detail::Barrier)
106 : {
107 26 : localNode->mapObject(this, id);
108 26 : }
109 :
110 90 : Barrier::~Barrier()
111 : {
112 62 : LocalNodePtr localNode = getLocalNode();
113 31 : if (localNode)
114 1 : localNode->releaseObject(this);
115 :
116 31 : delete _impl;
117 59 : }
118 :
119 : //---------------------------------------------------------------------------
120 : // Serialization
121 : //---------------------------------------------------------------------------
122 106 : void Barrier::getInstanceData(DataOStream& os)
123 : {
124 106 : LBASSERT(_impl->masterID != NodeID());
125 106 : os << _impl->height << _impl->masterID;
126 106 : }
127 :
128 26 : void Barrier::applyInstanceData(DataIStream& is)
129 : {
130 26 : is >> _impl->height >> _impl->masterID;
131 26 : }
132 :
133 101 : void Barrier::pack(DataOStream& os)
134 : {
135 101 : os << _impl->height;
136 101 : }
137 :
138 101 : void Barrier::unpack(DataIStream& is)
139 : {
140 101 : is >> _impl->height;
141 101 : }
142 : //---------------------------------------------------------------------------
143 :
144 1 : void Barrier::setHeight(const uint32_t height)
145 : {
146 1 : _impl->height = height;
147 1 : }
148 :
149 0 : void Barrier::increase()
150 : {
151 0 : ++_impl->height;
152 0 : }
153 :
154 28 : uint32_t Barrier::getHeight() const
155 : {
156 28 : return _impl->height;
157 : }
158 :
159 31 : void Barrier::attach(const uint128_t& id, const uint32_t instanceID)
160 : {
161 31 : Object::attach(id, instanceID);
162 :
163 62 : LocalNodePtr node = getLocalNode();
164 31 : CommandQueue* queue = node->getCommandThreadQueue();
165 :
166 62 : registerCommand(CMD_BARRIER_ENTER, CmdFunc(this, &Barrier::_cmdEnter),
167 31 : queue);
168 31 : registerCommand(CMD_BARRIER_ENTER_REPLY,
169 62 : CmdFunc(this, &Barrier::_cmdEnterReply), queue);
170 :
171 : #ifdef COLLAGE_V1_API
172 31 : if (_impl->masterID == NodeID())
173 26 : _impl->masterID = node->getNodeID();
174 : #else
175 : LBASSERT(_impl->masterID == NodeID());
176 : #endif
177 31 : }
178 :
179 352 : bool Barrier::enter(const uint32_t timeout)
180 : {
181 352 : LBASSERT(_impl->height > 0);
182 352 : LBASSERT(_impl->masterID != NodeID());
183 :
184 352 : if (_impl->height == 1) // trivial ;)
185 0 : return true;
186 :
187 352 : if (!_impl->master)
188 : {
189 66 : LocalNodePtr localNode = getLocalNode();
190 33 : _impl->master = localNode->connect(_impl->masterID);
191 : }
192 :
193 352 : LBASSERT(_impl->master);
194 352 : LBASSERT(_impl->master->isReachable());
195 352 : if (!_impl->master || !_impl->master->isReachable())
196 : {
197 0 : LBWARN << "Can't connect barrier master node " << _impl->masterID
198 0 : << std::endl;
199 0 : return false;
200 : }
201 :
202 704 : LBLOG(LOG_BARRIER) << "enter barrier " << getID() << " v" << getVersion()
203 1056 : << ", height " << _impl->height << std::endl;
204 :
205 352 : const uint32_t leaveVal = _impl->incarnation.get() + 1;
206 :
207 703 : send(_impl->master, CMD_BARRIER_ENTER) << getVersion() << leaveVal - 1
208 349 : << timeout;
209 :
210 352 : if (timeout == LB_TIMEOUT_INDEFINITE)
211 305 : _impl->incarnation.waitEQ(leaveVal);
212 47 : else if (!_impl->incarnation.timedWaitEQ(leaveVal, timeout))
213 23 : return false;
214 :
215 658 : LBLOG(LOG_BARRIER) << "left barrier " << getID() << " v" << getVersion()
216 987 : << ", height " << _impl->height << std::endl;
217 329 : return true;
218 : }
219 :
220 352 : bool Barrier::_cmdEnter(ICommand& cmd)
221 : {
222 352 : LB_TS_THREAD(_thread);
223 352 : LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
224 : _impl->master);
225 :
226 704 : ObjectICommand command(cmd);
227 352 : const uint128_t version = command.get<uint128_t>();
228 352 : const uint32_t incarnation = command.get<uint32_t>();
229 352 : const uint32_t timeout = command.get<uint32_t>();
230 :
231 352 : LBLOG(LOG_BARRIER) << "handle barrier enter " << command << " v" << version
232 704 : << " barrier v" << getVersion() << std::endl;
233 :
234 352 : Request& request = _impl->enteredNodes[version];
235 :
236 352 : LBLOG(LOG_BARRIER) << "enter barrier v" << version << ", has "
237 0 : << request.nodes.size() << " of " << _impl->height
238 352 : << std::endl;
239 :
240 352 : request.time = getLocalNode()->getTime64();
241 :
242 : // It's the first call to enter barrier
243 352 : if (request.nodes.empty())
244 : {
245 109 : request.incarnation = incarnation;
246 109 : request.timeout = timeout;
247 : }
248 243 : else if (request.timeout != LB_TIMEOUT_INDEFINITE)
249 : {
250 : // the incarnation belongs to an older barrier
251 40 : if (request.incarnation < incarnation)
252 : {
253 : // send directly the reply command to unblock the caller
254 0 : _sendNotify(version, command.getNode());
255 0 : return true;
256 : }
257 : // the previous enter had a timeout, start a new synchronization
258 : // (same version means same group -> no member can run ahead)
259 40 : else if (request.incarnation > incarnation)
260 : {
261 0 : request.nodes.clear();
262 0 : request.incarnation = incarnation;
263 0 : request.timeout = timeout;
264 : }
265 : }
266 352 : request.nodes.push_back(command.getNode());
267 :
268 : // clean older data which was not removed during older synchronization
269 352 : if (request.timeout != LB_TIMEOUT_INDEFINITE)
270 47 : _cleanup(request.time);
271 :
272 : // If we got early entry requests for this barrier, just note their
273 : // appearance. This requires that another request for the later version
274 : // arrives once the barrier reaches this version. The only case when this is
275 : // not the case is when no contributor to the current version contributes to
276 : // the later version, in which case deadlocks might happen because the later
277 : // version never leaves the barrier. We simply assume this is not the case.
278 352 : if (version > getVersion())
279 0 : return true;
280 :
281 : // If it's an older version a timeout has been handled.
282 : // For performance, send directly the order to unblock the caller.
283 352 : if (timeout != LB_TIMEOUT_INDEFINITE && version < getVersion())
284 : {
285 0 : LBASSERT(incarnation == 0);
286 0 : _sendNotify(version, command.getNode());
287 0 : return true;
288 : }
289 :
290 352 : LBASSERTINFO(version == getVersion(),
291 : "Barrier master updated to new version while in barrier "
292 : << getID() << " (" << version << " != " << getVersion()
293 : << ")");
294 :
295 352 : Nodes& nodes = request.nodes;
296 352 : if (nodes.size() < _impl->height)
297 245 : return true;
298 :
299 107 : LBASSERT(nodes.size() == _impl->height);
300 107 : LBLOG(LOG_BARRIER) << "Barrier reached " << getID() << " v" << version
301 107 : << std::endl;
302 :
303 107 : lunchbox::usort(nodes);
304 :
305 242 : for (NodesIter i = nodes.begin(); i != nodes.end(); ++i)
306 135 : _sendNotify(version, *i);
307 :
308 : // delete node vector for version
309 107 : _impl->enteredNodes.erase(version);
310 107 : return true;
311 : }
312 :
313 135 : void Barrier::_sendNotify(const uint128_t& version, NodePtr node)
314 : {
315 135 : LB_TS_THREAD(_thread);
316 135 : LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
317 : _impl->master);
318 :
319 135 : if (node->isLocal()) // OPT
320 : {
321 104 : LBLOG(LOG_BARRIER) << "Unlock local user(s)" << std::endl;
322 : // the case where we receive a different version of the barrier meant
323 : // that previosly we have detect a timeout true negative
324 104 : if (version == getVersion())
325 104 : ++_impl->incarnation;
326 : }
327 : else
328 : {
329 31 : LBLOG(LOG_BARRIER) << "Unlock " << node << std::endl;
330 31 : send(node, CMD_BARRIER_ENTER_REPLY) << version;
331 : }
332 135 : }
333 :
334 47 : void Barrier::_cleanup(const uint64_t time)
335 : {
336 47 : LB_TS_THREAD(_thread);
337 47 : LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
338 : _impl->master);
339 :
340 47 : if (_impl->enteredNodes.size() < 2)
341 47 : return;
342 :
343 0 : for (RequestMapIter i = _impl->enteredNodes.begin();
344 0 : i != _impl->enteredNodes.end(); ++i)
345 : {
346 0 : Request& cleanNodes = i->second;
347 :
348 0 : if (cleanNodes.timeout == LB_TIMEOUT_INDEFINITE)
349 0 : continue;
350 :
351 : const uint32_t timeout =
352 0 : cleanNodes.timeout != LB_TIMEOUT_DEFAULT
353 0 : ? cleanNodes.timeout
354 0 : : Global::getIAttribute(Global::IATTR_TIMEOUT_DEFAULT);
355 :
356 0 : if (time > cleanNodes.time + timeout)
357 : {
358 0 : _impl->enteredNodes.erase(i);
359 0 : return;
360 : }
361 : }
362 : }
363 :
364 31 : bool Barrier::_cmdEnterReply(ICommand& cmd)
365 : {
366 62 : ObjectICommand command(cmd);
367 31 : LB_TS_THREAD(_thread);
368 31 : LBLOG(LOG_BARRIER) << "Got ok, unlock local user(s)" << std::endl;
369 31 : const uint128_t version = command.get<uint128_t>();
370 :
371 31 : if (version == getVersion())
372 31 : ++_impl->incarnation;
373 :
374 62 : return true;
375 : }
376 63 : }
|