Line data Source code
1 :
2 : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "barrier.h"
23 :
24 : #include "iCommand.h"
25 : #include "connection.h"
26 : #include "dataIStream.h"
27 : #include "dataOStream.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "objectICommand.h"
31 : #include "objectOCommand.h"
32 : #include "barrierCommand.h"
33 : #include "exception.h"
34 :
35 : #include <lunchbox/monitor.h>
36 : #include <lunchbox/stdExt.h>
37 :
38 : namespace co
39 : {
40 : namespace
41 : {
42 109 : struct Request
43 : {
44 109 : Request()
45 109 : : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
46 : uint64_t time;
47 : uint32_t timeout;
48 : uint32_t incarnation;
49 : Nodes nodes;
50 : };
51 :
52 : typedef stde::hash_map< uint128_t, Request > RequestMap;
53 : typedef RequestMap::iterator RequestMapIter;
54 : }
55 :
56 : namespace detail
57 : {
58 31 : class Barrier
59 : {
60 : public:
61 26 : Barrier() : height( 0 ) {}
62 5 : Barrier( const uint128_t& masterID_, const uint32_t height_ )
63 : : masterID( masterID_ )
64 5 : , height( height_ )
65 5 : {}
66 :
67 : /** The master barrier node. */
68 : NodeID masterID;
69 :
70 : /** The height of the barrier, only set on the master. */
71 : uint32_t height;
72 :
73 : /** The local, connected instantiation of the master node. */
74 : NodePtr master;
75 :
76 : /** Slave nodes which have entered the barrier, index per version. */
77 : RequestMap enteredNodes;
78 :
79 : /** The monitor used for barrier leave notification. */
80 : lunchbox::Monitor< uint32_t > incarnation;
81 : };
82 : }
83 :
84 : typedef CommandFunc<Barrier> CmdFunc;
85 :
86 5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
87 : const uint32_t height )
88 5 : : _impl( new detail::Barrier( masterNodeID.isUUID() ?
89 0 : masterNodeID : localNode->getNodeID(),
90 10 : height ))
91 : {
92 5 : localNode->registerObject( this );
93 5 : }
94 :
95 26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
96 26 : : _impl( new detail::Barrier )
97 : {
98 26 : localNode->mapObject( this, id );
99 26 : }
100 :
101 90 : Barrier::~Barrier()
102 : {
103 31 : LocalNodePtr localNode = getLocalNode();
104 31 : if( localNode )
105 1 : localNode->releaseObject( this );
106 :
107 31 : delete _impl;
108 59 : }
109 :
110 : //---------------------------------------------------------------------------
111 : // Serialization
112 : //---------------------------------------------------------------------------
113 109 : void Barrier::getInstanceData( DataOStream& os )
114 : {
115 109 : LBASSERT( _impl->masterID != NodeID( ));
116 109 : os << _impl->height << _impl->masterID;
117 109 : }
118 :
119 26 : void Barrier::applyInstanceData( DataIStream& is )
120 : {
121 26 : is >> _impl->height >> _impl->masterID;
122 26 : }
123 :
124 104 : void Barrier::pack( DataOStream& os )
125 : {
126 104 : os << _impl->height;
127 104 : }
128 :
129 101 : void Barrier::unpack( DataIStream& is )
130 : {
131 101 : is >> _impl->height;
132 101 : }
133 : //---------------------------------------------------------------------------
134 :
135 1 : void Barrier::setHeight( const uint32_t height )
136 : {
137 1 : _impl->height = height;
138 1 : }
139 :
140 0 : void Barrier::increase()
141 : {
142 0 : ++_impl->height;
143 0 : }
144 :
145 28 : uint32_t Barrier::getHeight() const
146 : {
147 28 : return _impl->height;
148 : }
149 :
150 31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
151 : {
152 31 : Object::attach( id, instanceID );
153 :
154 31 : LocalNodePtr node = getLocalNode();
155 31 : CommandQueue* queue = node->getCommandThreadQueue();
156 :
157 : registerCommand( CMD_BARRIER_ENTER,
158 31 : CmdFunc( this, &Barrier::_cmdEnter ), queue );
159 : registerCommand( CMD_BARRIER_ENTER_REPLY,
160 31 : CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
161 :
162 : #ifdef COLLAGE_V1_API
163 31 : if( _impl->masterID == NodeID( ))
164 26 : _impl->masterID = node->getNodeID();
165 : #else
166 : LBASSERT( _impl->masterID == NodeID( ));
167 : #endif
168 31 : }
169 :
170 352 : bool Barrier::enter( const uint32_t timeout )
171 : {
172 352 : LBASSERT( _impl->height > 0 );
173 352 : LBASSERT( _impl->masterID != NodeID( ));
174 :
175 352 : if( _impl->height == 1 ) // trivial ;)
176 0 : return true;
177 :
178 352 : if( !_impl->master )
179 : {
180 33 : LocalNodePtr localNode = getLocalNode();
181 33 : _impl->master = localNode->connect( _impl->masterID );
182 : }
183 :
184 352 : LBASSERT( _impl->master );
185 352 : LBASSERT( _impl->master->isReachable( ));
186 352 : if( !_impl->master || !_impl->master->isReachable( ))
187 : {
188 0 : LBWARN << "Can't connect barrier master node " << _impl->masterID
189 0 : << std::endl;
190 0 : return false;
191 : }
192 :
193 704 : LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
194 1056 : << ", height " << _impl->height << std::endl;
195 :
196 352 : const uint32_t leaveVal = _impl->incarnation.get() + 1;
197 :
198 : send( _impl->master, CMD_BARRIER_ENTER )
199 352 : << getVersion() << leaveVal - 1 << timeout;
200 :
201 352 : if( timeout == LB_TIMEOUT_INDEFINITE )
202 305 : _impl->incarnation.waitEQ( leaveVal );
203 47 : else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
204 23 : return false;
205 :
206 658 : LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
207 987 : << ", height " << _impl->height << std::endl;
208 329 : return true;
209 : }
210 :
211 352 : bool Barrier::_cmdEnter( ICommand& cmd )
212 : {
213 352 : LB_TS_THREAD( _thread );
214 352 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
215 : _impl->master );
216 :
217 352 : ObjectICommand command( cmd );
218 352 : const uint128_t version = command.get< uint128_t >();
219 352 : const uint32_t incarnation = command.get< uint32_t >();
220 352 : const uint32_t timeout = command.get< uint32_t >();
221 :
222 352 : LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
223 0 : << " v" << version
224 704 : << " barrier v" << getVersion() << std::endl;
225 :
226 352 : Request& request = _impl->enteredNodes[ version ];
227 :
228 352 : LBLOG( LOG_BARRIER ) << "enter barrier v" << version
229 0 : << ", has " << request.nodes.size() << " of "
230 352 : << _impl->height << std::endl;
231 :
232 352 : request.time = getLocalNode()->getTime64();
233 :
234 : // It's the first call to enter barrier
235 352 : if( request.nodes.empty( ))
236 : {
237 109 : request.incarnation = incarnation;
238 109 : request.timeout = timeout;
239 : }
240 243 : else if( request.timeout != LB_TIMEOUT_INDEFINITE )
241 : {
242 : // the incarnation belongs to an older barrier
243 40 : if( request.incarnation < incarnation )
244 : {
245 : // send directly the reply command to unblock the caller
246 0 : _sendNotify( version, command.getNode( ));
247 0 : return true;
248 : }
249 : // the previous enter had a timeout, start a new synchronization
250 : // (same version means same group -> no member can run ahead)
251 40 : else if( request.incarnation > incarnation )
252 : {
253 0 : request.nodes.clear();
254 0 : request.incarnation = incarnation;
255 0 : request.timeout = timeout;
256 : }
257 : }
258 352 : request.nodes.push_back( command.getNode( ));
259 :
260 : // clean older data which was not removed during older synchronization
261 352 : if( request.timeout != LB_TIMEOUT_INDEFINITE )
262 47 : _cleanup( request.time );
263 :
264 : // If we got early entry requests for this barrier, just note their
265 : // appearance. This requires that another request for the later version
266 : // arrives once the barrier reaches this version. The only case when this is
267 : // not the case is when no contributor to the current version contributes to
268 : // the later version, in which case deadlocks might happen because the later
269 : // version never leaves the barrier. We simply assume this is not the case.
270 352 : if( version > getVersion( ))
271 0 : return true;
272 :
273 : // If it's an older version a timeout has been handled.
274 : // For performance, send directly the order to unblock the caller.
275 352 : if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
276 : {
277 0 : LBASSERT( incarnation == 0 );
278 0 : _sendNotify( version, command.getNode( ) );
279 0 : return true;
280 : }
281 :
282 352 : LBASSERTINFO( version == getVersion(),
283 : "Barrier master updated to new version while in barrier " <<
284 : getID() << " (" << version << " != " << getVersion() << ")" );
285 :
286 352 : Nodes& nodes = request.nodes;
287 352 : if( nodes.size() < _impl->height )
288 245 : return true;
289 :
290 107 : LBASSERT( nodes.size() == _impl->height );
291 107 : LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
292 107 : << std::endl;
293 :
294 107 : stde::usort( nodes );
295 :
296 242 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
297 135 : _sendNotify( version, *i );
298 :
299 : // delete node vector for version
300 107 : RequestMapIter i = _impl->enteredNodes.find( version );
301 107 : LBASSERT( i != _impl->enteredNodes.end( ));
302 107 : _impl->enteredNodes.erase( i );
303 107 : return true;
304 : }
305 :
306 135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
307 : {
308 135 : LB_TS_THREAD( _thread );
309 135 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
310 : _impl->master );
311 :
312 135 : if( node->isLocal( )) // OPT
313 : {
314 104 : LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
315 : // the case where we receive a different version of the barrier meant
316 : // that previosly we have detect a timeout true negative
317 104 : if( version == getVersion() )
318 104 : ++_impl->incarnation;
319 : }
320 : else
321 : {
322 31 : LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
323 31 : send( node, CMD_BARRIER_ENTER_REPLY ) << version;
324 : }
325 135 : }
326 :
327 47 : void Barrier::_cleanup( const uint64_t time )
328 : {
329 47 : LB_TS_THREAD( _thread );
330 47 : LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
331 : _impl->master );
332 :
333 47 : if( _impl->enteredNodes.size() < 2 )
334 47 : return;
335 :
336 0 : for( RequestMapIter i = _impl->enteredNodes.begin();
337 0 : i != _impl->enteredNodes.end(); ++i )
338 : {
339 0 : Request& cleanNodes = i->second;
340 :
341 0 : if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
342 0 : continue;
343 :
344 0 : const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
345 : cleanNodes.timeout :
346 0 : Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
347 :
348 0 : if( time > cleanNodes.time + timeout )
349 : {
350 0 : _impl->enteredNodes.erase( i );
351 0 : return;
352 : }
353 : }
354 : }
355 :
356 31 : bool Barrier::_cmdEnterReply( ICommand& cmd )
357 : {
358 31 : ObjectICommand command( cmd );
359 31 : LB_TS_THREAD( _thread );
360 31 : LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
361 31 : const uint128_t version = command.get< uint128_t >();
362 :
363 31 : if( version == getVersion( ))
364 31 : ++_impl->incarnation;
365 :
366 31 : return true;
367 : }
368 :
369 63 : }
|