Line data Source code
1 :
2 : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "barrier.h"
23 :
24 : #include "iCommand.h"
25 : #include "connection.h"
26 : #include "dataIStream.h"
27 : #include "dataOStream.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "objectICommand.h"
31 : #include "objectOCommand.h"
32 : #include "barrierCommand.h"
33 : #include "exception.h"
34 :
35 : #include <lunchbox/monitor.h>
36 : #include <lunchbox/stdExt.h>
37 :
38 : namespace co
39 : {
40 : namespace
41 : {
42 109 : struct Request
43 : {
44 109 : Request()
45 109 : : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
46 : uint64_t time;
47 : uint32_t timeout;
48 : uint32_t incarnation;
49 : Nodes nodes;
50 : };
51 :
52 : typedef stde::hash_map< uint128_t, Request > RequestMap;
53 : typedef RequestMap::iterator RequestMapIter;
54 : }
55 :
56 : namespace detail
57 : {
58 31 : class Barrier
59 : {
60 : public:
61 26 : Barrier() : height( 0 ) {}
62 5 : Barrier( const uint128_t& masterID_, const uint32_t height_ )
63 : : masterID( masterID_ )
64 5 : , height( height_ )
65 5 : {}
66 :
67 : /** The master barrier node. */
68 : NodeID masterID;
69 :
70 : /** The height of the barrier, only set on the master. */
71 : uint32_t height;
72 :
73 : /** The local, connected instantiation of the master node. */
74 : NodePtr master;
75 :
76 : /** Slave nodes which have entered the barrier, index per version. */
77 : RequestMap enteredNodes;
78 :
79 : /** The monitor used for barrier leave notification. */
80 : lunchbox::Monitor< uint32_t > incarnation;
81 : };
82 : }
83 :
84 : typedef CommandFunc<Barrier> CmdFunc;
85 :
86 : #ifdef COLLAGE_V1_API
87 0 : Barrier::Barrier( NodePtr master, const uint32_t height )
88 0 : : _impl( new detail::Barrier( master ? master->getNodeID() : NodeID(),
89 0 : height ))
90 0 : {}
91 : #endif
92 :
93 5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
94 : const uint32_t height )
95 5 : : _impl( new detail::Barrier( masterNodeID.isUUID() ?
96 0 : masterNodeID : localNode->getNodeID(),
97 10 : height ))
98 : {
99 5 : localNode->registerObject( this );
100 5 : }
101 :
102 26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
103 26 : : _impl( new detail::Barrier )
104 : {
105 26 : localNode->mapObject( this, id );
106 26 : }
107 :
108 90 : Barrier::~Barrier()
109 : {
110 31 : LocalNodePtr localNode = getLocalNode();
111 31 : if( localNode )
112 1 : localNode->releaseObject( this );
113 :
114 31 : delete _impl;
115 59 : }
116 :
117 : //---------------------------------------------------------------------------
118 : // Serialization
119 : //---------------------------------------------------------------------------
120 109 : void Barrier::getInstanceData( DataOStream& os )
121 : {
122 109 : LBASSERT( _impl->masterID != NodeID( ));
123 109 : os << _impl->height << _impl->masterID;
124 109 : }
125 :
126 26 : void Barrier::applyInstanceData( DataIStream& is )
127 : {
128 26 : is >> _impl->height >> _impl->masterID;
129 26 : }
130 :
131 104 : void Barrier::pack( DataOStream& os )
132 : {
133 104 : os << _impl->height;
134 104 : }
135 :
136 101 : void Barrier::unpack( DataIStream& is )
137 : {
138 101 : is >> _impl->height;
139 101 : }
140 : //---------------------------------------------------------------------------
141 :
142 1 : void Barrier::setHeight( const uint32_t height )
143 : {
144 1 : _impl->height = height;
145 1 : }
146 :
147 0 : void Barrier::increase()
148 : {
149 0 : ++_impl->height;
150 0 : }
151 :
152 28 : uint32_t Barrier::getHeight() const
153 : {
154 28 : return _impl->height;
155 : }
156 :
157 31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
158 : {
159 31 : Object::attach( id, instanceID );
160 :
161 31 : LocalNodePtr node = getLocalNode();
162 31 : CommandQueue* queue = node->getCommandThreadQueue();
163 :
164 : registerCommand( CMD_BARRIER_ENTER,
165 31 : CmdFunc( this, &Barrier::_cmdEnter ), queue );
166 : registerCommand( CMD_BARRIER_ENTER_REPLY,
167 31 : CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
168 :
169 : #ifdef COLLAGE_V1_API
170 31 : if( _impl->masterID == NodeID( ))
171 26 : _impl->masterID = node->getNodeID();
172 : #else
173 : LBASSERT( _impl->masterID == NodeID( ));
174 : #endif
175 31 : }
176 :
177 352 : void Barrier::enter( const uint32_t timeout )
178 : {
179 352 : LBASSERT( _impl->height > 0 );
180 352 : LBASSERT( _impl->masterID != NodeID( ));
181 :
182 352 : if( _impl->height == 1 ) // trivial ;)
183 0 : return;
184 :
185 352 : if( !_impl->master )
186 : {
187 32 : LocalNodePtr localNode = getLocalNode();
188 32 : _impl->master = localNode->connect( _impl->masterID );
189 : }
190 :
191 352 : LBASSERT( _impl->master );
192 352 : LBASSERT( _impl->master->isReachable( ));
193 352 : if( !_impl->master || !_impl->master->isReachable( ))
194 : {
195 0 : LBWARN << "Can't connect barrier master node " << _impl->masterID
196 0 : << std::endl;
197 0 : return;
198 : }
199 :
200 704 : LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
201 1056 : << ", height " << _impl->height << std::endl;
202 :
203 352 : const uint32_t leaveVal = _impl->incarnation.get() + 1;
204 :
205 : send( _impl->master, CMD_BARRIER_ENTER )
206 352 : << getVersion() << leaveVal - 1 << timeout;
207 :
208 352 : if( timeout == LB_TIMEOUT_INDEFINITE )
209 305 : _impl->incarnation.waitEQ( leaveVal );
210 47 : else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
211 23 : throw Exception( Exception::TIMEOUT_BARRIER );
212 :
213 654 : LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
214 981 : << ", height " << _impl->height << std::endl;
215 : }
216 :
217 352 : bool Barrier::_cmdEnter( ICommand& cmd )
218 : {
219 352 : LB_TS_THREAD( _thread );
220 352 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
221 : _impl->master );
222 :
223 352 : ObjectICommand command( cmd );
224 352 : const uint128_t version = command.get< uint128_t >();
225 352 : const uint32_t incarnation = command.get< uint32_t >();
226 352 : const uint32_t timeout = command.get< uint32_t >();
227 :
228 352 : LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
229 0 : << " v" << version
230 704 : << " barrier v" << getVersion() << std::endl;
231 :
232 352 : Request& request = _impl->enteredNodes[ version ];
233 :
234 352 : LBLOG( LOG_BARRIER ) << "enter barrier v" << version
235 0 : << ", has " << request.nodes.size() << " of "
236 352 : << _impl->height << std::endl;
237 :
238 352 : request.time = getLocalNode()->getTime64();
239 :
240 : // It's the first call to enter barrier
241 352 : if( request.nodes.empty( ))
242 : {
243 109 : request.incarnation = incarnation;
244 109 : request.timeout = timeout;
245 : }
246 243 : else if( request.timeout != LB_TIMEOUT_INDEFINITE )
247 : {
248 : // the incarnation belongs to an older barrier
249 40 : if( request.incarnation < incarnation )
250 : {
251 : // send directly the reply command to unblock the caller
252 0 : _sendNotify( version, command.getNode( ));
253 0 : return true;
254 : }
255 : // the previous enter had a timeout, start a new synchronization
256 : // (same version means same group -> no member can run ahead)
257 40 : else if( request.incarnation > incarnation )
258 : {
259 0 : request.nodes.clear();
260 0 : request.incarnation = incarnation;
261 0 : request.timeout = timeout;
262 : }
263 : }
264 352 : request.nodes.push_back( command.getNode( ));
265 :
266 : // clean older data which was not removed during older synchronization
267 352 : if( request.timeout != LB_TIMEOUT_INDEFINITE )
268 47 : _cleanup( request.time );
269 :
270 : // If we got early entry requests for this barrier, just note their
271 : // appearance. This requires that another request for the later version
272 : // arrives once the barrier reaches this version. The only case when this is
273 : // not the case is when no contributor to the current version contributes to
274 : // the later version, in which case deadlocks might happen because the later
275 : // version never leaves the barrier. We simply assume this is not the case.
276 352 : if( version > getVersion( ))
277 0 : return true;
278 :
279 : // If it's an older version a timeout has been handled.
280 : // For performance, send directly the order to unblock the caller.
281 352 : if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
282 : {
283 0 : LBASSERT( incarnation == 0 );
284 0 : _sendNotify( version, command.getNode( ) );
285 0 : return true;
286 : }
287 :
288 352 : LBASSERTINFO( version == getVersion(),
289 : "Barrier master updated to new version while in barrier " <<
290 : getID() << " (" << version << " != " << getVersion() << ")" );
291 :
292 352 : Nodes& nodes = request.nodes;
293 352 : if( nodes.size() < _impl->height )
294 245 : return true;
295 :
296 107 : LBASSERT( nodes.size() == _impl->height );
297 107 : LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
298 107 : << std::endl;
299 :
300 107 : stde::usort( nodes );
301 :
302 242 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
303 135 : _sendNotify( version, *i );
304 :
305 : // delete node vector for version
306 107 : RequestMapIter i = _impl->enteredNodes.find( version );
307 107 : LBASSERT( i != _impl->enteredNodes.end( ));
308 107 : _impl->enteredNodes.erase( i );
309 107 : return true;
310 : }
311 :
312 135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
313 : {
314 135 : LB_TS_THREAD( _thread );
315 135 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
316 : _impl->master );
317 :
318 135 : if( node->isLocal( )) // OPT
319 : {
320 104 : LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
321 : // the case where we receive a different version of the barrier meant
322 : // that previosly we have detect a timeout true negative
323 104 : if( version == getVersion() )
324 104 : ++_impl->incarnation;
325 : }
326 : else
327 : {
328 31 : LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
329 31 : send( node, CMD_BARRIER_ENTER_REPLY ) << version;
330 : }
331 135 : }
332 :
333 47 : void Barrier::_cleanup( const uint64_t time )
334 : {
335 47 : LB_TS_THREAD( _thread );
336 47 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
337 : _impl->master );
338 :
339 47 : if( _impl->enteredNodes.size() < 2 )
340 47 : return;
341 :
342 0 : for( RequestMapIter i = _impl->enteredNodes.begin();
343 0 : i != _impl->enteredNodes.end(); ++i )
344 : {
345 0 : Request& cleanNodes = i->second;
346 :
347 0 : if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
348 0 : continue;
349 :
350 0 : const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
351 : cleanNodes.timeout :
352 0 : Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
353 :
354 0 : if( time > cleanNodes.time + timeout )
355 : {
356 0 : _impl->enteredNodes.erase( i );
357 0 : return;
358 : }
359 : }
360 : }
361 :
362 28 : bool Barrier::_cmdEnterReply( ICommand& cmd )
363 : {
364 28 : ObjectICommand command( cmd );
365 28 : LB_TS_THREAD( _thread );
366 28 : LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
367 28 : const uint128_t version = command.get< uint128_t >();
368 :
369 28 : if( version == getVersion( ))
370 29 : ++_impl->incarnation;
371 :
372 29 : return true;
373 : }
374 :
375 60 : }
|