Line data Source code
1 :
2 : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "barrier.h"
23 :
24 : #include "iCommand.h"
25 : #include "connection.h"
26 : #include "dataIStream.h"
27 : #include "dataOStream.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "objectICommand.h"
31 : #include "objectOCommand.h"
32 : #include "barrierCommand.h"
33 : #include "exception.h"
34 :
35 : #include <lunchbox/monitor.h>
36 : #include <lunchbox/stdExt.h>
37 :
38 : #include <unordered_map>
39 :
40 : namespace co
41 : {
42 : namespace
43 : {
44 109 : struct Request
45 : {
46 109 : Request()
47 109 : : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
48 : uint64_t time;
49 : uint32_t timeout;
50 : uint32_t incarnation;
51 : Nodes nodes;
52 : };
53 :
54 : typedef std::unordered_map< uint128_t, Request > RequestMap;
55 : typedef RequestMap::iterator RequestMapIter;
56 : }
57 :
58 : namespace detail
59 : {
60 31 : class Barrier
61 : {
62 : public:
63 26 : Barrier() : height( 0 ) {}
64 5 : Barrier( const uint128_t& masterID_, const uint32_t height_ )
65 : : masterID( masterID_ )
66 5 : , height( height_ )
67 5 : {}
68 :
69 : /** The master barrier node. */
70 : NodeID masterID;
71 :
72 : /** The height of the barrier, only set on the master. */
73 : uint32_t height;
74 :
75 : /** The local, connected instantiation of the master node. */
76 : NodePtr master;
77 :
78 : /** Slave nodes which have entered the barrier, index per version. */
79 : RequestMap enteredNodes;
80 :
81 : /** The monitor used for barrier leave notification. */
82 : lunchbox::Monitor< uint32_t > incarnation;
83 : };
84 : }
85 :
86 : typedef CommandFunc<Barrier> CmdFunc;
87 :
88 5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
89 : const uint32_t height )
90 5 : : _impl( new detail::Barrier( masterNodeID.isUUID() ?
91 0 : masterNodeID : localNode->getNodeID(),
92 10 : height ))
93 : {
94 5 : localNode->registerObject( this );
95 5 : }
96 :
97 26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
98 26 : : _impl( new detail::Barrier )
99 : {
100 26 : localNode->mapObject( this, id );
101 26 : }
102 :
103 90 : Barrier::~Barrier()
104 : {
105 31 : LocalNodePtr localNode = getLocalNode();
106 31 : if( localNode )
107 1 : localNode->releaseObject( this );
108 :
109 31 : delete _impl;
110 59 : }
111 :
112 : //---------------------------------------------------------------------------
113 : // Serialization
114 : //---------------------------------------------------------------------------
115 106 : void Barrier::getInstanceData( DataOStream& os )
116 : {
117 106 : LBASSERT( _impl->masterID != NodeID( ));
118 106 : os << _impl->height << _impl->masterID;
119 106 : }
120 :
121 26 : void Barrier::applyInstanceData( DataIStream& is )
122 : {
123 26 : is >> _impl->height >> _impl->masterID;
124 26 : }
125 :
126 101 : void Barrier::pack( DataOStream& os )
127 : {
128 101 : os << _impl->height;
129 101 : }
130 :
131 101 : void Barrier::unpack( DataIStream& is )
132 : {
133 101 : is >> _impl->height;
134 101 : }
135 : //---------------------------------------------------------------------------
136 :
137 1 : void Barrier::setHeight( const uint32_t height )
138 : {
139 1 : _impl->height = height;
140 1 : }
141 :
142 0 : void Barrier::increase()
143 : {
144 0 : ++_impl->height;
145 0 : }
146 :
147 28 : uint32_t Barrier::getHeight() const
148 : {
149 28 : return _impl->height;
150 : }
151 :
152 31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
153 : {
154 31 : Object::attach( id, instanceID );
155 :
156 31 : LocalNodePtr node = getLocalNode();
157 31 : CommandQueue* queue = node->getCommandThreadQueue();
158 :
159 : registerCommand( CMD_BARRIER_ENTER,
160 31 : CmdFunc( this, &Barrier::_cmdEnter ), queue );
161 : registerCommand( CMD_BARRIER_ENTER_REPLY,
162 31 : CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
163 :
164 : #ifdef COLLAGE_V1_API
165 31 : if( _impl->masterID == NodeID( ))
166 26 : _impl->masterID = node->getNodeID();
167 : #else
168 : LBASSERT( _impl->masterID == NodeID( ));
169 : #endif
170 31 : }
171 :
172 349 : bool Barrier::enter( const uint32_t timeout )
173 : {
174 349 : LBASSERT( _impl->height > 0 );
175 352 : LBASSERT( _impl->masterID != NodeID( ));
176 :
177 352 : if( _impl->height == 1 ) // trivial ;)
178 0 : return true;
179 :
180 352 : if( !_impl->master )
181 : {
182 31 : LocalNodePtr localNode = getLocalNode();
183 33 : _impl->master = localNode->connect( _impl->masterID );
184 : }
185 :
186 354 : LBASSERT( _impl->master );
187 352 : LBASSERT( _impl->master->isReachable( ));
188 352 : if( !_impl->master || !_impl->master->isReachable( ))
189 : {
190 0 : LBWARN << "Can't connect barrier master node " << _impl->masterID
191 0 : << std::endl;
192 0 : return false;
193 : }
194 :
195 704 : LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
196 1056 : << ", height " << _impl->height << std::endl;
197 :
198 352 : const uint32_t leaveVal = _impl->incarnation.get() + 1;
199 :
200 : send( _impl->master, CMD_BARRIER_ENTER )
201 352 : << getVersion() << leaveVal - 1 << timeout;
202 :
203 352 : if( timeout == LB_TIMEOUT_INDEFINITE )
204 305 : _impl->incarnation.waitEQ( leaveVal );
205 47 : else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
206 23 : return false;
207 :
208 658 : LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
209 987 : << ", height " << _impl->height << std::endl;
210 329 : return true;
211 : }
212 :
213 352 : bool Barrier::_cmdEnter( ICommand& cmd )
214 : {
215 352 : LB_TS_THREAD( _thread );
216 352 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
217 : _impl->master );
218 :
219 352 : ObjectICommand command( cmd );
220 352 : const uint128_t version = command.get< uint128_t >();
221 352 : const uint32_t incarnation = command.get< uint32_t >();
222 352 : const uint32_t timeout = command.get< uint32_t >();
223 :
224 352 : LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
225 0 : << " v" << version
226 704 : << " barrier v" << getVersion() << std::endl;
227 :
228 352 : Request& request = _impl->enteredNodes[ version ];
229 :
230 352 : LBLOG( LOG_BARRIER ) << "enter barrier v" << version
231 0 : << ", has " << request.nodes.size() << " of "
232 352 : << _impl->height << std::endl;
233 :
234 352 : request.time = getLocalNode()->getTime64();
235 :
236 : // It's the first call to enter barrier
237 352 : if( request.nodes.empty( ))
238 : {
239 109 : request.incarnation = incarnation;
240 109 : request.timeout = timeout;
241 : }
242 243 : else if( request.timeout != LB_TIMEOUT_INDEFINITE )
243 : {
244 : // the incarnation belongs to an older barrier
245 40 : if( request.incarnation < incarnation )
246 : {
247 : // send directly the reply command to unblock the caller
248 0 : _sendNotify( version, command.getNode( ));
249 0 : return true;
250 : }
251 : // the previous enter had a timeout, start a new synchronization
252 : // (same version means same group -> no member can run ahead)
253 40 : else if( request.incarnation > incarnation )
254 : {
255 0 : request.nodes.clear();
256 0 : request.incarnation = incarnation;
257 0 : request.timeout = timeout;
258 : }
259 : }
260 352 : request.nodes.push_back( command.getNode( ));
261 :
262 : // clean older data which was not removed during older synchronization
263 352 : if( request.timeout != LB_TIMEOUT_INDEFINITE )
264 47 : _cleanup( request.time );
265 :
266 : // If we got early entry requests for this barrier, just note their
267 : // appearance. This requires that another request for the later version
268 : // arrives once the barrier reaches this version. The only case when this is
269 : // not the case is when no contributor to the current version contributes to
270 : // the later version, in which case deadlocks might happen because the later
271 : // version never leaves the barrier. We simply assume this is not the case.
272 352 : if( version > getVersion( ))
273 0 : return true;
274 :
275 : // If it's an older version a timeout has been handled.
276 : // For performance, send directly the order to unblock the caller.
277 352 : if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
278 : {
279 0 : LBASSERT( incarnation == 0 );
280 0 : _sendNotify( version, command.getNode( ) );
281 0 : return true;
282 : }
283 :
284 352 : LBASSERTINFO( version == getVersion(),
285 : "Barrier master updated to new version while in barrier " <<
286 : getID() << " (" << version << " != " << getVersion() << ")" );
287 :
288 352 : Nodes& nodes = request.nodes;
289 352 : if( nodes.size() < _impl->height )
290 245 : return true;
291 :
292 107 : LBASSERT( nodes.size() == _impl->height );
293 107 : LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
294 107 : << std::endl;
295 :
296 107 : stde::usort( nodes );
297 :
298 242 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
299 135 : _sendNotify( version, *i );
300 :
301 : // delete node vector for version
302 107 : _impl->enteredNodes.erase( version );
303 107 : return true;
304 : }
305 :
306 135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
307 : {
308 135 : LB_TS_THREAD( _thread );
309 135 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
310 : _impl->master );
311 :
312 135 : if( node->isLocal( )) // OPT
313 : {
314 104 : LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
315 : // the case where we receive a different version of the barrier meant
316 : // that previosly we have detect a timeout true negative
317 104 : if( version == getVersion() )
318 104 : ++_impl->incarnation;
319 : }
320 : else
321 : {
322 31 : LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
323 31 : send( node, CMD_BARRIER_ENTER_REPLY ) << version;
324 : }
325 135 : }
326 :
327 47 : void Barrier::_cleanup( const uint64_t time )
328 : {
329 47 : LB_TS_THREAD( _thread );
330 47 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
331 : _impl->master );
332 :
333 47 : if( _impl->enteredNodes.size() < 2 )
334 47 : return;
335 :
336 0 : for( RequestMapIter i = _impl->enteredNodes.begin();
337 0 : i != _impl->enteredNodes.end(); ++i )
338 : {
339 0 : Request& cleanNodes = i->second;
340 :
341 0 : if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
342 0 : continue;
343 :
344 0 : const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
345 : cleanNodes.timeout :
346 0 : Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
347 :
348 0 : if( time > cleanNodes.time + timeout )
349 : {
350 0 : _impl->enteredNodes.erase( i );
351 0 : return;
352 : }
353 : }
354 : }
355 :
356 31 : bool Barrier::_cmdEnterReply( ICommand& cmd )
357 : {
358 31 : ObjectICommand command( cmd );
359 31 : LB_TS_THREAD( _thread );
360 31 : LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
361 31 : const uint128_t version = command.get< uint128_t >();
362 :
363 31 : if( version == getVersion( ))
364 31 : ++_impl->incarnation;
365 :
366 31 : return true;
367 : }
368 :
369 66 : }
|