Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "node.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "customOCommand.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 :
28 : #include <lunchbox/file.h>
29 : #include <lunchbox/scopedMutex.h>
30 : #include <lunchbox/spinLock.h>
31 :
32 : #ifndef MAXPATHLEN
33 : # define MAXPATHLEN 1024
34 : #endif
35 :
36 : namespace co
37 : {
38 : namespace
39 : {
40 : /** The state of the node. */
41 : enum State
42 : {
43 : STATE_CLOSED, //!< initial state
44 : STATE_CONNECTED, //!< proxy for a remote node, connected
45 : STATE_LISTENING, //!< local node, listening
46 : STATE_CLOSING //!< listening, about to close
47 : };
48 :
49 0 : struct MCData
50 : {
51 : ConnectionPtr connection;
52 : NodePtr node;
53 : };
54 : typedef std::vector< MCData > MCDatas;
55 : }
56 :
57 : namespace detail
58 : {
59 : class Node
60 : {
61 : public:
62 : /** Globally unique node identifier. */
63 : NodeID id;
64 :
65 : const uint32_t type;
66 :
67 : /** The current state of this node. */
68 : State state;
69 :
70 : /** The connection to this node. */
71 : ConnectionPtr outgoing;
72 :
73 : /** The multicast connection to this node, can be 0. */
74 : lunchbox::Lockable< ConnectionPtr > outMulticast;
75 :
76 : /**
77 : * Yet unused multicast connections for this node.
78 : *
79 : * On the first multicast send usage, the connection is 'primed' by sending
80 : * our node identifier to the MC group, removed from this vector and set as
81 : * outMulticast.
82 : */
83 : MCDatas multicasts;
84 :
85 : /** The host name used to launch the remote node process */
86 : std::string hostname;
87 :
88 : /** The list of descriptions on how this node is reachable. */
89 : lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
90 : connectionDescriptions;
91 :
92 : /** Last time commands were received */
93 : int64_t lastReceive;
94 :
95 : /** Is a big endian host? */
96 : bool bigEndian;
97 :
98 123 : explicit Node( const uint32_t type_ )
99 123 : : id( lunchbox::make_UUID( ))
100 : , type( type_ )
101 : , state( STATE_CLOSED )
102 : , lastReceive ( 0 )
103 : #ifdef COLLAGE_BIGENDIAN
104 : , bigEndian( true )
105 : #else
106 123 : , bigEndian( false )
107 : #endif
108 123 : {}
109 :
110 121 : ~Node()
111 121 : {
112 121 : LBASSERT( !outgoing );
113 121 : connectionDescriptions->clear();
114 121 : }
115 : };
116 : }
117 :
118 123 : Node::Node( const uint32_t type )
119 123 : : _impl( new detail::Node( type ))
120 : {
121 122 : LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
122 122 : }
123 :
124 310 : Node::~Node()
125 : {
126 121 : LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
127 121 : delete _impl;
128 189 : }
129 :
130 0 : bool Node::operator == ( const Node* node ) const
131 : {
132 0 : LBASSERTINFO( _impl->id != node->_impl->id || this == node,
133 : "Two node instances with the same ID found "
134 : << (void*)this << " and " << (void*)node );
135 :
136 0 : return ( _impl == node->_impl );
137 : }
138 :
139 638 : ConnectionDescriptions Node::getConnectionDescriptions() const
140 : {
141 1276 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
142 1276 : return _impl->connectionDescriptions.data;
143 : }
144 :
145 233 : ConnectionPtr Node::getMulticast()
146 : {
147 233 : if( !isReachable( ))
148 0 : return 0;
149 :
150 466 : ConnectionPtr connection = _impl->outMulticast.data;
151 233 : if( connection && !connection->isClosed( ))
152 0 : return connection;
153 :
154 466 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
155 233 : if( _impl->multicasts.empty( ))
156 233 : return 0;
157 :
158 0 : MCData data = _impl->multicasts.back();
159 0 : _impl->multicasts.pop_back();
160 0 : NodePtr node = data.node;
161 :
162 : // prime multicast connections on peers
163 0 : LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
164 0 : << data.connection->getDescription() << std::endl;
165 :
166 : #ifdef COLLAGE_BIGENDIAN
167 : uint32_t cmd = CMD_NODE_ID_BE;
168 : lunchbox::byteswap( cmd );
169 : #else
170 0 : const uint32_t cmd = CMD_NODE_ID;
171 : #endif
172 0 : OCommand( Connections( 1, data.connection ), cmd )
173 0 : << node->getNodeID() << getType() << node->serialize();
174 :
175 0 : _impl->outMulticast.data = data.connection;
176 0 : return data.connection;
177 : }
178 :
179 81 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
180 : {
181 81 : LBASSERTINFO( isClosed(), *this );
182 81 : if( !isClosed( ))
183 0 : return;
184 81 : _addConnectionDescription( cd );
185 : }
186 :
187 81 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
188 : {
189 162 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
190 81 : _impl->connectionDescriptions->push_back( cd );
191 81 : }
192 :
193 0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
194 : {
195 0 : LBASSERTINFO( isClosed(), *this );
196 0 : if( !isClosed( ))
197 0 : return false;
198 0 : return _removeConnectionDescription( cd );
199 : }
200 :
201 0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
202 : {
203 0 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
204 :
205 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
206 0 : for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
207 0 : i != _impl->connectionDescriptions->end(); ++i )
208 : {
209 0 : if( *cd != **i )
210 0 : continue;
211 :
212 0 : _impl->connectionDescriptions->erase( i );
213 0 : return true;
214 : }
215 0 : return false;
216 : }
217 :
218 68 : std::string Node::serialize() const
219 : {
220 136 : std::ostringstream data;
221 68 : data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
222 136 : << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
223 68 : << CO_SEPARATOR;
224 : {
225 136 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
226 68 : data << co::serialize( _impl->connectionDescriptions.data );
227 : }
228 136 : return data.str();
229 : }
230 :
231 68 : bool Node::deserialize( std::string& data )
232 : {
233 68 : LBASSERT( _impl->state == STATE_CLOSED );
234 :
235 : // version check
236 68 : int32_t major = 0;
237 68 : size_t nextPos = data.find( CO_SEPARATOR );
238 68 : if( nextPos == std::string::npos || nextPos == 0 )
239 : {
240 0 : LBERROR << "Could not parse node major version data" << std::endl;
241 0 : return false;
242 : }
243 :
244 136 : std::istringstream is( data.substr( 0, nextPos ));
245 68 : data = data.substr( nextPos + 1 );
246 68 : is >> major;
247 :
248 68 : int32_t minor = 0;
249 68 : nextPos = data.find( CO_SEPARATOR );
250 68 : if( nextPos == std::string::npos || nextPos == 0 )
251 : {
252 0 : LBERROR << "Could not parse node minor version data" << std::endl;
253 0 : return false;
254 : }
255 :
256 68 : is.clear();
257 68 : is.str( data.substr( 0, nextPos ));
258 68 : data = data.substr( nextPos + 1 );
259 68 : is >> minor;
260 :
261 68 : if( major != Version::getMajor() || minor != Version::getMinor( ))
262 : {
263 0 : LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
264 0 : << minor << ", local node uses " << Version::getMajor() << '.'
265 0 : << Version::getMinor() << std::endl;
266 : }
267 :
268 : // node id
269 68 : nextPos = data.find( CO_SEPARATOR );
270 68 : if( nextPos == std::string::npos || nextPos == 0 )
271 : {
272 0 : LBERROR << "Could not parse node id data" << std::endl;
273 0 : return false;
274 : }
275 :
276 68 : _impl->id = data.substr( 0, nextPos );
277 68 : data = data.substr( nextPos + 1 );
278 :
279 : // endianness
280 68 : nextPos = data.find( CO_SEPARATOR );
281 68 : if( nextPos == std::string::npos || nextPos == 0 )
282 : {
283 0 : LBERROR << "Could not parse node endianness data" << std::endl;
284 0 : return false;
285 : }
286 :
287 68 : is.clear();
288 68 : is.str( data.substr( 0, nextPos ));
289 68 : data = data.substr( nextPos + 1 );
290 68 : is >> _impl->bigEndian;
291 :
292 : // Connections data
293 136 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
294 68 : _impl->connectionDescriptions->clear();
295 68 : return co::deserialize( data, _impl->connectionDescriptions.data );
296 : }
297 :
298 72077 : bool Node::isBigEndian() const
299 : {
300 72077 : return _impl->bigEndian;
301 : }
302 :
303 1343 : bool Node::isReachable() const
304 : {
305 1343 : return isListening() || isConnected();
306 : }
307 :
308 646 : bool Node::isConnected() const
309 : {
310 646 : return _impl->state == STATE_CONNECTED;
311 : }
312 :
313 175201 : bool Node::isClosed() const
314 : {
315 175201 : return _impl->state == STATE_CLOSED;
316 : }
317 :
318 49 : bool Node::isClosing() const
319 : {
320 49 : return _impl->state == STATE_CLOSING;
321 : }
322 :
323 74328 : bool Node::isListening() const
324 : {
325 74328 : return _impl->state == STATE_LISTENING;
326 : }
327 :
328 753 : ConnectionPtr Node::getConnection( const bool preferMulticast )
329 : {
330 1507 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
331 1508 : return multicast ? multicast : _impl->outgoing;
332 : }
333 :
334 71476 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
335 : {
336 142952 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
337 71476 : if( !isClosed( ))
338 71476 : return multicast ? multicast : _impl->outgoing;
339 0 : LBUNREACHABLE;
340 0 : return 0;
341 : }
342 :
343 115 : ConnectionPtr Node::_getMulticast() const
344 : {
345 115 : return _impl->outMulticast.data;
346 : }
347 :
348 0 : void Node::setHostname( const std::string& host )
349 : {
350 0 : _impl->hostname = host;
351 0 : }
352 :
353 0 : const std::string& Node::getHostname() const
354 : {
355 0 : return _impl->hostname;
356 : }
357 :
358 0 : std::string Node::getWorkDir() const
359 : {
360 0 : return lunchbox::getWorkDir();
361 : }
362 :
363 0 : std::string Node::getLaunchQuote() const
364 : {
365 : #ifdef WIN32
366 : return "\"";
367 : #else
368 0 : return "\'";
369 : #endif
370 : }
371 :
372 71474 : OCommand Node::send( const uint32_t cmd, const bool multicast )
373 : {
374 142948 : ConnectionPtr connection = _getConnection( multicast );
375 71474 : LBASSERT( connection );
376 142948 : return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
377 : }
378 :
379 2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
380 : {
381 4 : ConnectionPtr connection = _getConnection( multicast );
382 2 : LBASSERT( connection );
383 4 : return CustomOCommand( Connections( 1, connection ), commandID );
384 : }
385 :
386 198868 : const NodeID& Node::getNodeID() const
387 : {
388 198868 : return _impl->id;
389 : }
390 :
391 0 : int64_t Node::getLastReceiveTime() const
392 : {
393 0 : return _impl->lastReceive;
394 : }
395 :
396 183 : uint32_t Node::getType() const
397 : {
398 183 : return _impl->type;
399 : }
400 :
401 0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
402 : {
403 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
404 0 : MCData data;
405 0 : data.connection = connection;
406 0 : data.node = node;
407 0 : _impl->multicasts.push_back( data );
408 0 : }
409 :
410 0 : void Node::_removeMulticast( ConnectionPtr connection )
411 : {
412 0 : LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
413 :
414 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
415 0 : if( _impl->outMulticast == connection )
416 0 : _impl->outMulticast.data = 0;
417 : else
418 : {
419 0 : for( MCDatas::iterator j = _impl->multicasts.begin();
420 0 : j != _impl->multicasts.end(); ++j )
421 : {
422 0 : if( (*j).connection != connection )
423 0 : continue;
424 :
425 0 : _impl->multicasts.erase( j );
426 0 : return;
427 : }
428 : }
429 : }
430 :
431 68 : void Node::_connectMulticast( NodePtr node )
432 : {
433 136 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
434 :
435 68 : if( node->_impl->outMulticast.data.isValid( ))
436 : // multicast already connected by previous _cmdID
437 0 : return;
438 :
439 : // Search if the connected node is in the same multicast group as we are
440 136 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
441 405 : for( ConnectionDescriptionsCIter i = descriptions.begin();
442 270 : i != descriptions.end(); ++i )
443 : {
444 67 : ConnectionDescriptionPtr description = *i;
445 67 : if( description->type < CONNECTIONTYPE_MULTICAST )
446 67 : continue;
447 :
448 : const ConnectionDescriptions& fromDescs =
449 0 : node->getConnectionDescriptions();
450 0 : for( ConnectionDescriptionsCIter j = fromDescs.begin();
451 0 : j != fromDescs.end(); ++j )
452 : {
453 0 : ConnectionDescriptionPtr fromDescription = *j;
454 0 : if( !description->isSameMulticastGroup( fromDescription ))
455 0 : continue;
456 :
457 0 : LBASSERT( !node->_impl->outMulticast.data );
458 0 : LBASSERT( node->_impl->multicasts.empty( ));
459 :
460 0 : if( _impl->outMulticast->isValid() &&
461 0 : _impl->outMulticast.data->getDescription() == description )
462 : {
463 0 : node->_impl->outMulticast.data = _impl->outMulticast.data;
464 0 : LBDEBUG << "Using " << description << " as multicast group for "
465 0 : << node->getNodeID() << std::endl;
466 : }
467 : // find unused multicast connection to node
468 0 : else for( MCDatas::const_iterator k = _impl->multicasts.begin();
469 0 : k != _impl->multicasts.end(); ++k )
470 : {
471 0 : const MCData& data = *k;
472 : ConstConnectionDescriptionPtr dataDesc =
473 0 : data.connection->getDescription();
474 0 : if( !description->isSameMulticastGroup( dataDesc ))
475 0 : continue;
476 :
477 0 : node->_impl->multicasts.push_back( data );
478 0 : LBDEBUG << "Adding " << dataDesc << " as multicast group for "
479 0 : << node->getNodeID() << std::endl;
480 : }
481 : }
482 : }
483 : }
484 :
485 0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
486 : {
487 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
488 0 : MCDatas::iterator i = node->_impl->multicasts.begin();
489 0 : for( ; i != node->_impl->multicasts.end(); ++i )
490 : {
491 0 : if( (*i).connection == connection )
492 0 : break;
493 : }
494 :
495 0 : if( node->_impl->outMulticast->isValid( ))
496 : {
497 0 : if( node->_impl->outMulticast.data == connection )
498 : {
499 : // nop, connection already used
500 0 : LBASSERT( i == node->_impl->multicasts.end( ));
501 : }
502 0 : else if( i == node->_impl->multicasts.end( ))
503 : {
504 : // another connection is used as multicast connection, save this
505 0 : LBASSERT( isListening( ));
506 0 : MCData data;
507 0 : data.connection = connection;
508 0 : data.node = this;
509 0 : _impl->multicasts.push_back( data );
510 : }
511 : // else nop, already know connection
512 : }
513 : else
514 : {
515 0 : node->_impl->outMulticast.data = connection;
516 0 : if( i != node->_impl->multicasts.end( ))
517 0 : node->_impl->multicasts.erase( i );
518 : }
519 0 : }
520 :
521 50 : void Node::_setListening()
522 : {
523 50 : _impl->state = STATE_LISTENING;
524 50 : }
525 :
526 49 : void Node::_setClosing()
527 : {
528 49 : _impl->state = STATE_CLOSING;
529 49 : }
530 :
531 99 : void Node::_setClosed()
532 : {
533 99 : _impl->state = STATE_CLOSED;
534 99 : }
535 :
536 118 : void Node::_connect( ConnectionPtr connection )
537 : {
538 118 : _impl->outgoing = connection;
539 118 : _impl->state = STATE_CONNECTED;
540 118 : }
541 :
542 164 : void Node::_disconnect()
543 : {
544 164 : _impl->state = STATE_CLOSED;
545 164 : _impl->outgoing = 0;
546 164 : _impl->outMulticast.data = 0;
547 164 : _impl->multicasts.clear();
548 164 : }
549 :
550 72077 : void Node::_setLastReceive( const int64_t time )
551 : {
552 72077 : _impl->lastReceive = time;
553 72077 : }
554 :
555 435 : std::ostream& operator << ( std::ostream& os, const State state )
556 : {
557 : os << ( state == STATE_CLOSED ? "closed" :
558 290 : state == STATE_CONNECTED ? "connected" :
559 725 : state == STATE_LISTENING ? "listening" : "ERROR" );
560 434 : return os;
561 : }
562 :
563 435 : std::ostream& operator << ( std::ostream& os, const Node& node )
564 : {
565 435 : os << "node " << node.getNodeID() << " " << node._impl->state;
566 869 : const ConnectionDescriptions& descs = node.getConnectionDescriptions();
567 862 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
568 427 : os << ", " << (*i)->toString();
569 870 : return os;
570 : }
571 :
572 66 : }
|