Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "node.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "customOCommand.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 :
28 : #include <lunchbox/scopedMutex.h>
29 : #include <lunchbox/spinLock.h>
30 :
31 : namespace co
32 : {
33 : namespace
34 : {
35 : /** The state of the node. */
36 : enum State
37 : {
38 : STATE_CLOSED, //!< initial state
39 : STATE_CONNECTED, //!< proxy for a remote node, connected
40 : STATE_LISTENING, //!< local node, listening
41 : STATE_CLOSING //!< listening, about to close
42 : };
43 :
44 0 : struct MCData
45 : {
46 : ConnectionPtr connection;
47 : NodePtr node;
48 : };
49 : typedef std::vector< MCData > MCDatas;
50 : }
51 :
52 : namespace detail
53 : {
54 : class Node
55 : {
56 : public:
57 : /** Globally unique node identifier. */
58 : NodeID id;
59 :
60 : const uint32_t type;
61 :
62 : /** The current state of this node. */
63 : State state;
64 :
65 : /** The connection to this node. */
66 : ConnectionPtr outgoing;
67 :
68 : /** The multicast connection to this node, can be 0. */
69 : lunchbox::Lockable< ConnectionPtr > outMulticast;
70 :
71 : /**
72 : * Yet unused multicast connections for this node.
73 : *
74 : * On the first multicast send usage, the connection is 'primed' by sending
75 : * our node identifier to the MC group, removed from this vector and set as
76 : * outMulticast.
77 : */
78 : MCDatas multicasts;
79 :
80 : /** The list of descriptions on how this node is reachable. */
81 : lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
82 : connectionDescriptions;
83 :
84 : /** Last time commands were received */
85 : int64_t lastReceive;
86 :
87 : /** Is a big endian host? */
88 : bool bigEndian;
89 :
90 116 : explicit Node( const uint32_t type_ )
91 : : id( lunchbox::make_UUID( ))
92 : , type( type_ )
93 : , state( STATE_CLOSED )
94 : , lastReceive ( 0 )
95 : #ifdef COMMON_BIGENDIAN
96 : , bigEndian( true )
97 : #else
98 116 : , bigEndian( false )
99 : #endif
100 117 : {}
101 :
102 115 : ~Node()
103 115 : {
104 115 : LBASSERT( !outgoing );
105 115 : connectionDescriptions->clear();
106 115 : }
107 : };
108 : }
109 :
110 116 : Node::Node( const uint32_t type )
111 116 : : _impl( new detail::Node( type ))
112 : {
113 117 : LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
114 117 : }
115 :
116 296 : Node::~Node()
117 : {
118 115 : LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
119 115 : delete _impl;
120 181 : }
121 :
122 0 : bool Node::operator == ( const Node* node ) const
123 : {
124 0 : LBASSERTINFO( _impl->id != node->_impl->id || this == node,
125 : "Two node instances with the same ID found "
126 : << (void*)this << " and " << (void*)node );
127 :
128 0 : return ( _impl == node->_impl );
129 : }
130 :
131 606 : ConnectionDescriptions Node::getConnectionDescriptions() const
132 : {
133 606 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
134 606 : return _impl->connectionDescriptions.data;
135 : }
136 :
137 231 : ConnectionPtr Node::getMulticast()
138 : {
139 231 : if( !isReachable( ))
140 0 : return 0;
141 :
142 231 : ConnectionPtr connection = _impl->outMulticast.data;
143 231 : if( connection && !connection->isClosed( ))
144 0 : return connection;
145 :
146 462 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
147 231 : if( _impl->multicasts.empty( ))
148 231 : return 0;
149 :
150 0 : MCData data = _impl->multicasts.back();
151 0 : _impl->multicasts.pop_back();
152 0 : NodePtr node = data.node;
153 :
154 : // prime multicast connections on peers
155 0 : LBINFO << "Announcing id " << node->getNodeID() << " to multicast group "
156 0 : << data.connection->getDescription() << std::endl;
157 :
158 : #ifdef COMMON_BIGENDIAN
159 : uint32_t cmd = CMD_NODE_ID_BE;
160 : lunchbox::byteswap( cmd );
161 : #else
162 0 : const uint32_t cmd = CMD_NODE_ID;
163 : #endif
164 : OCommand( Connections( 1, data.connection ), cmd )
165 0 : << node->getNodeID() << getType() << node->serialize();
166 :
167 0 : _impl->outMulticast.data = data.connection;
168 231 : return data.connection;
169 : }
170 :
171 77 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
172 : {
173 77 : LBASSERTINFO( isClosed(), *this );
174 77 : if( !isClosed( ))
175 77 : return;
176 77 : _addConnectionDescription( cd );
177 : }
178 :
179 77 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
180 : {
181 77 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
182 77 : _impl->connectionDescriptions->push_back( cd );
183 77 : }
184 :
185 0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
186 : {
187 0 : LBASSERTINFO( isClosed(), *this );
188 0 : if( !isClosed( ))
189 0 : return false;
190 0 : return _removeConnectionDescription( cd );
191 : }
192 :
193 0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
194 : {
195 0 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
196 :
197 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
198 0 : for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
199 0 : i != _impl->connectionDescriptions->end(); ++i )
200 : {
201 0 : if( *cd != **i )
202 0 : continue;
203 :
204 0 : _impl->connectionDescriptions->erase( i );
205 0 : return true;
206 : }
207 0 : return false;
208 : }
209 :
210 66 : std::string Node::serialize() const
211 : {
212 66 : std::ostringstream data;
213 66 : data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
214 132 : << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
215 66 : << CO_SEPARATOR;
216 : {
217 66 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
218 66 : data << co::serialize( _impl->connectionDescriptions.data );
219 : }
220 66 : return data.str();
221 : }
222 :
223 66 : bool Node::deserialize( std::string& data )
224 : {
225 66 : LBASSERT( _impl->state == STATE_CLOSED );
226 :
227 : // version check
228 66 : int32_t major = 0;
229 66 : size_t nextPos = data.find( CO_SEPARATOR );
230 66 : if( nextPos == std::string::npos || nextPos == 0 )
231 : {
232 0 : LBERROR << "Could not parse node major version data" << std::endl;
233 0 : return false;
234 : }
235 :
236 66 : std::istringstream is( data.substr( 0, nextPos ));
237 66 : data = data.substr( nextPos + 1 );
238 66 : is >> major;
239 :
240 66 : int32_t minor = 0;
241 66 : nextPos = data.find( CO_SEPARATOR );
242 66 : if( nextPos == std::string::npos || nextPos == 0 )
243 : {
244 0 : LBERROR << "Could not parse node minor version data" << std::endl;
245 0 : return false;
246 : }
247 :
248 66 : is.clear();
249 66 : is.str( data.substr( 0, nextPos ));
250 66 : data = data.substr( nextPos + 1 );
251 66 : is >> minor;
252 :
253 66 : if( major != Version::getMajor() || minor != Version::getMinor( ))
254 : {
255 0 : LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
256 0 : << minor << ", local node uses " << Version::getMajor() << '.'
257 0 : << Version::getMinor() << std::endl;
258 : }
259 :
260 : // node id
261 66 : nextPos = data.find( CO_SEPARATOR );
262 66 : if( nextPos == std::string::npos || nextPos == 0 )
263 : {
264 0 : LBERROR << "Could not parse node id data" << std::endl;
265 0 : return false;
266 : }
267 :
268 66 : _impl->id = data.substr( 0, nextPos );
269 66 : data = data.substr( nextPos + 1 );
270 :
271 : // endianness
272 66 : nextPos = data.find( CO_SEPARATOR );
273 66 : if( nextPos == std::string::npos || nextPos == 0 )
274 : {
275 0 : LBERROR << "Could not parse node endianness data" << std::endl;
276 0 : return false;
277 : }
278 :
279 66 : is.clear();
280 66 : is.str( data.substr( 0, nextPos ));
281 66 : data = data.substr( nextPos + 1 );
282 66 : is >> _impl->bigEndian;
283 :
284 : // Connections data
285 132 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
286 66 : _impl->connectionDescriptions->clear();
287 132 : return co::deserialize( data, _impl->connectionDescriptions.data );
288 : }
289 :
290 72058 : bool Node::isBigEndian() const
291 : {
292 72058 : return _impl->bigEndian;
293 : }
294 :
295 1333 : bool Node::isReachable() const
296 : {
297 1333 : return isListening() || isConnected();
298 : }
299 :
300 636 : bool Node::isConnected() const
301 : {
302 636 : return _impl->state == STATE_CONNECTED;
303 : }
304 :
305 174727 : bool Node::isClosed() const
306 : {
307 174727 : return _impl->state == STATE_CLOSED;
308 : }
309 :
310 45 : bool Node::isClosing() const
311 : {
312 45 : return _impl->state == STATE_CLOSING;
313 : }
314 :
315 74269 : bool Node::isListening() const
316 : {
317 74269 : return _impl->state == STATE_LISTENING;
318 : }
319 :
320 744 : ConnectionPtr Node::getConnection( const bool preferMulticast )
321 : {
322 744 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
323 744 : return multicast ? multicast : _impl->outgoing;
324 : }
325 :
326 71459 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
327 : {
328 71459 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
329 71459 : if( !isClosed( ))
330 71459 : return multicast ? multicast : _impl->outgoing;
331 0 : LBUNREACHABLE;
332 0 : return 0;
333 : }
334 :
335 110 : ConnectionPtr Node::_getMulticast() const
336 : {
337 110 : return _impl->outMulticast.data;
338 : }
339 :
340 71457 : OCommand Node::send( const uint32_t cmd, const bool multicast )
341 : {
342 71457 : ConnectionPtr connection = _getConnection( multicast );
343 71457 : LBASSERT( connection );
344 71457 : return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
345 : }
346 :
347 2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
348 : {
349 2 : ConnectionPtr connection = _getConnection( multicast );
350 2 : LBASSERT( connection );
351 2 : return CustomOCommand( Connections( 1, connection ), commandID );
352 : }
353 :
354 164100 : const NodeID& Node::getNodeID() const
355 : {
356 164100 : return _impl->id;
357 : }
358 :
359 0 : int64_t Node::getLastReceiveTime() const
360 : {
361 0 : return _impl->lastReceive;
362 : }
363 :
364 176 : uint32_t Node::getType() const
365 : {
366 176 : return _impl->type;
367 : }
368 :
369 0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
370 : {
371 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
372 0 : MCData data;
373 0 : data.connection = connection;
374 0 : data.node = node;
375 0 : _impl->multicasts.push_back( data );
376 0 : }
377 :
378 0 : void Node::_removeMulticast( ConnectionPtr connection )
379 : {
380 0 : LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
381 :
382 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
383 0 : if( _impl->outMulticast == connection )
384 0 : _impl->outMulticast.data = 0;
385 : else
386 : {
387 0 : for( MCDatas::iterator j = _impl->multicasts.begin();
388 0 : j != _impl->multicasts.end(); ++j )
389 : {
390 0 : if( (*j).connection != connection )
391 0 : continue;
392 :
393 0 : _impl->multicasts.erase( j );
394 0 : return;
395 : }
396 0 : }
397 : }
398 :
399 66 : void Node::_connectMulticast( NodePtr node )
400 : {
401 66 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
402 :
403 66 : if( node->_impl->outMulticast.data.isValid( ))
404 : // multicast already connected by previous _cmdID
405 66 : return;
406 :
407 : // Search if the connected node is in the same multicast group as we are
408 132 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
409 393 : for( ConnectionDescriptionsCIter i = descriptions.begin();
410 262 : i != descriptions.end(); ++i )
411 : {
412 65 : ConnectionDescriptionPtr description = *i;
413 65 : if( description->type < CONNECTIONTYPE_MULTICAST )
414 65 : continue;
415 :
416 : const ConnectionDescriptions& fromDescs =
417 0 : node->getConnectionDescriptions();
418 0 : for( ConnectionDescriptionsCIter j = fromDescs.begin();
419 0 : j != fromDescs.end(); ++j )
420 : {
421 0 : ConnectionDescriptionPtr fromDescription = *j;
422 0 : if( !description->isSameMulticastGroup( fromDescription ))
423 0 : continue;
424 :
425 0 : LBASSERT( !node->_impl->outMulticast.data );
426 0 : LBASSERT( node->_impl->multicasts.empty( ));
427 :
428 0 : if( _impl->outMulticast->isValid() &&
429 0 : _impl->outMulticast.data->getDescription() == description )
430 : {
431 0 : node->_impl->outMulticast.data = _impl->outMulticast.data;
432 0 : LBINFO << "Using " << description << " as multicast group for "
433 0 : << node->getNodeID() << std::endl;
434 : }
435 : // find unused multicast connection to node
436 0 : else for( MCDatas::const_iterator k = _impl->multicasts.begin();
437 0 : k != _impl->multicasts.end(); ++k )
438 : {
439 0 : const MCData& data = *k;
440 : ConstConnectionDescriptionPtr dataDesc =
441 0 : data.connection->getDescription();
442 0 : if( !description->isSameMulticastGroup( dataDesc ))
443 0 : continue;
444 :
445 0 : node->_impl->multicasts.push_back( data );
446 0 : LBINFO << "Adding " << dataDesc << " as multicast group for "
447 0 : << node->getNodeID() << std::endl;
448 0 : }
449 0 : }
450 66 : }
451 : }
452 :
453 0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
454 : {
455 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
456 0 : MCDatas::iterator i = node->_impl->multicasts.begin();
457 0 : for( ; i != node->_impl->multicasts.end(); ++i )
458 : {
459 0 : if( (*i).connection == connection )
460 0 : break;
461 : }
462 :
463 0 : if( node->_impl->outMulticast->isValid( ))
464 : {
465 0 : if( node->_impl->outMulticast.data == connection )
466 : {
467 : // nop, connection already used
468 0 : LBASSERT( i == node->_impl->multicasts.end( ));
469 : }
470 0 : else if( i == node->_impl->multicasts.end( ))
471 : {
472 : // another connection is used as multicast connection, save this
473 0 : LBASSERT( isListening( ));
474 0 : MCData data;
475 0 : data.connection = connection;
476 0 : data.node = this;
477 0 : _impl->multicasts.push_back( data );
478 : }
479 : // else nop, already know connection
480 : }
481 : else
482 : {
483 0 : node->_impl->outMulticast.data = connection;
484 0 : if( i != node->_impl->multicasts.end( ))
485 0 : node->_impl->multicasts.erase( i );
486 0 : }
487 0 : }
488 :
489 46 : void Node::_setListening()
490 : {
491 46 : _impl->state = STATE_LISTENING;
492 46 : }
493 :
494 45 : void Node::_setClosing()
495 : {
496 45 : _impl->state = STATE_CLOSING;
497 45 : }
498 :
499 91 : void Node::_setClosed()
500 : {
501 91 : _impl->state = STATE_CLOSED;
502 91 : }
503 :
504 112 : void Node::_connect( ConnectionPtr connection )
505 : {
506 112 : _impl->outgoing = connection;
507 112 : _impl->state = STATE_CONNECTED;
508 112 : }
509 :
510 155 : void Node::_disconnect()
511 : {
512 155 : _impl->state = STATE_CLOSED;
513 155 : _impl->outgoing = 0;
514 155 : _impl->outMulticast.data = 0;
515 155 : _impl->multicasts.clear();
516 155 : }
517 :
518 72058 : void Node::_setLastReceive( const int64_t time )
519 : {
520 72058 : _impl->lastReceive = time;
521 72058 : }
522 :
523 414 : std::ostream& operator << ( std::ostream& os, const State state )
524 : {
525 : os << ( state == STATE_CLOSED ? "closed" :
526 : state == STATE_CONNECTED ? "connected" :
527 414 : state == STATE_LISTENING ? "listening" : "ERROR" );
528 414 : return os;
529 : }
530 :
531 414 : std::ostream& operator << ( std::ostream& os, const Node& node )
532 : {
533 414 : os << "node " << node.getNodeID() << " " << node._impl->state;
534 414 : const ConnectionDescriptions& descs = node.getConnectionDescriptions();
535 822 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
536 408 : os << ", " << (*i)->toString();
537 414 : return os;
538 : }
539 :
540 63 : }
|