Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "node.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "customOCommand.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 :
28 : #include <lunchbox/scopedMutex.h>
29 : #include <lunchbox/spinLock.h>
30 :
31 : #ifdef _MSC_VER
32 : # include <direct.h>
33 : # define getcwd _getcwd
34 : #endif
35 : #ifndef MAXPATHLEN
36 : # define MAXPATHLEN 1024
37 : #endif
38 :
39 : namespace co
40 : {
41 : namespace
42 : {
43 : /** The state of the node. */
44 : enum State
45 : {
46 : STATE_CLOSED, //!< initial state
47 : STATE_CONNECTED, //!< proxy for a remote node, connected
48 : STATE_LISTENING, //!< local node, listening
49 : STATE_CLOSING //!< listening, about to close
50 : };
51 :
52 0 : struct MCData
53 : {
54 : ConnectionPtr connection;
55 : NodePtr node;
56 : };
57 : typedef std::vector< MCData > MCDatas;
58 : }
59 :
60 : namespace detail
61 : {
62 : class Node
63 : {
64 : public:
65 : /** Globally unique node identifier. */
66 : NodeID id;
67 :
68 : const uint32_t type;
69 :
70 : /** The current state of this node. */
71 : State state;
72 :
73 : /** The connection to this node. */
74 : ConnectionPtr outgoing;
75 :
76 : /** The multicast connection to this node, can be 0. */
77 : lunchbox::Lockable< ConnectionPtr > outMulticast;
78 :
79 : /**
80 : * Yet unused multicast connections for this node.
81 : *
82 : * On the first multicast send usage, the connection is 'primed' by sending
83 : * our node identifier to the MC group, removed from this vector and set as
84 : * outMulticast.
85 : */
86 : MCDatas multicasts;
87 :
88 : /** The host name used to launch the remote node process */
89 : std::string hostname;
90 :
91 : /** The list of descriptions on how this node is reachable. */
92 : lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
93 : connectionDescriptions;
94 :
95 : /** Last time commands were received */
96 : int64_t lastReceive;
97 :
98 : /** Is a big endian host? */
99 : bool bigEndian;
100 :
101 123 : explicit Node( const uint32_t type_ )
102 : : id( lunchbox::make_UUID( ))
103 : , type( type_ )
104 : , state( STATE_CLOSED )
105 : , lastReceive ( 0 )
106 : #ifdef COLLAGE_BIGENDIAN
107 : , bigEndian( true )
108 : #else
109 123 : , bigEndian( false )
110 : #endif
111 123 : {}
112 :
113 121 : ~Node()
114 121 : {
115 121 : LBASSERT( !outgoing );
116 121 : connectionDescriptions->clear();
117 121 : }
118 : };
119 : }
120 :
121 123 : Node::Node( const uint32_t type )
122 123 : : _impl( new detail::Node( type ))
123 : {
124 123 : LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
125 123 : }
126 :
127 310 : Node::~Node()
128 : {
129 121 : LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
130 121 : delete _impl;
131 189 : }
132 :
133 0 : bool Node::operator == ( const Node* node ) const
134 : {
135 0 : LBASSERTINFO( _impl->id != node->_impl->id || this == node,
136 : "Two node instances with the same ID found "
137 : << (void*)this << " and " << (void*)node );
138 :
139 0 : return ( _impl == node->_impl );
140 : }
141 :
142 638 : ConnectionDescriptions Node::getConnectionDescriptions() const
143 : {
144 638 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
145 638 : return _impl->connectionDescriptions.data;
146 : }
147 :
148 233 : ConnectionPtr Node::getMulticast()
149 : {
150 233 : if( !isReachable( ))
151 0 : return 0;
152 :
153 233 : ConnectionPtr connection = _impl->outMulticast.data;
154 233 : if( connection && !connection->isClosed( ))
155 0 : return connection;
156 :
157 466 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
158 233 : if( _impl->multicasts.empty( ))
159 233 : return 0;
160 :
161 0 : MCData data = _impl->multicasts.back();
162 0 : _impl->multicasts.pop_back();
163 0 : NodePtr node = data.node;
164 :
165 : // prime multicast connections on peers
166 0 : LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
167 0 : << data.connection->getDescription() << std::endl;
168 :
169 : #ifdef COLLAGE_BIGENDIAN
170 : uint32_t cmd = CMD_NODE_ID_BE;
171 : lunchbox::byteswap( cmd );
172 : #else
173 0 : const uint32_t cmd = CMD_NODE_ID;
174 : #endif
175 : OCommand( Connections( 1, data.connection ), cmd )
176 0 : << node->getNodeID() << getType() << node->serialize();
177 :
178 0 : _impl->outMulticast.data = data.connection;
179 233 : return data.connection;
180 : }
181 :
182 81 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
183 : {
184 81 : LBASSERTINFO( isClosed(), *this );
185 81 : if( !isClosed( ))
186 81 : return;
187 81 : _addConnectionDescription( cd );
188 : }
189 :
190 81 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
191 : {
192 81 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
193 81 : _impl->connectionDescriptions->push_back( cd );
194 81 : }
195 :
196 0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
197 : {
198 0 : LBASSERTINFO( isClosed(), *this );
199 0 : if( !isClosed( ))
200 0 : return false;
201 0 : return _removeConnectionDescription( cd );
202 : }
203 :
204 0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
205 : {
206 0 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
207 :
208 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
209 0 : for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
210 0 : i != _impl->connectionDescriptions->end(); ++i )
211 : {
212 0 : if( *cd != **i )
213 0 : continue;
214 :
215 0 : _impl->connectionDescriptions->erase( i );
216 0 : return true;
217 : }
218 0 : return false;
219 : }
220 :
221 68 : std::string Node::serialize() const
222 : {
223 68 : std::ostringstream data;
224 68 : data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
225 136 : << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
226 68 : << CO_SEPARATOR;
227 : {
228 68 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
229 68 : data << co::serialize( _impl->connectionDescriptions.data );
230 : }
231 68 : return data.str();
232 : }
233 :
234 68 : bool Node::deserialize( std::string& data )
235 : {
236 68 : LBASSERT( _impl->state == STATE_CLOSED );
237 :
238 : // version check
239 68 : int32_t major = 0;
240 68 : size_t nextPos = data.find( CO_SEPARATOR );
241 68 : if( nextPos == std::string::npos || nextPos == 0 )
242 : {
243 0 : LBERROR << "Could not parse node major version data" << std::endl;
244 0 : return false;
245 : }
246 :
247 68 : std::istringstream is( data.substr( 0, nextPos ));
248 68 : data = data.substr( nextPos + 1 );
249 68 : is >> major;
250 :
251 68 : int32_t minor = 0;
252 68 : nextPos = data.find( CO_SEPARATOR );
253 68 : if( nextPos == std::string::npos || nextPos == 0 )
254 : {
255 0 : LBERROR << "Could not parse node minor version data" << std::endl;
256 0 : return false;
257 : }
258 :
259 68 : is.clear();
260 68 : is.str( data.substr( 0, nextPos ));
261 68 : data = data.substr( nextPos + 1 );
262 68 : is >> minor;
263 :
264 68 : if( major != Version::getMajor() || minor != Version::getMinor( ))
265 : {
266 0 : LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
267 0 : << minor << ", local node uses " << Version::getMajor() << '.'
268 0 : << Version::getMinor() << std::endl;
269 : }
270 :
271 : // node id
272 68 : nextPos = data.find( CO_SEPARATOR );
273 68 : if( nextPos == std::string::npos || nextPos == 0 )
274 : {
275 0 : LBERROR << "Could not parse node id data" << std::endl;
276 0 : return false;
277 : }
278 :
279 68 : _impl->id = data.substr( 0, nextPos );
280 68 : data = data.substr( nextPos + 1 );
281 :
282 : // endianness
283 68 : nextPos = data.find( CO_SEPARATOR );
284 68 : if( nextPos == std::string::npos || nextPos == 0 )
285 : {
286 0 : LBERROR << "Could not parse node endianness data" << std::endl;
287 0 : return false;
288 : }
289 :
290 68 : is.clear();
291 68 : is.str( data.substr( 0, nextPos ));
292 68 : data = data.substr( nextPos + 1 );
293 68 : is >> _impl->bigEndian;
294 :
295 : // Connections data
296 136 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
297 68 : _impl->connectionDescriptions->clear();
298 136 : return co::deserialize( data, _impl->connectionDescriptions.data );
299 : }
300 :
301 72077 : bool Node::isBigEndian() const
302 : {
303 72077 : return _impl->bigEndian;
304 : }
305 :
306 1341 : bool Node::isReachable() const
307 : {
308 1341 : return isListening() || isConnected();
309 : }
310 :
311 646 : bool Node::isConnected() const
312 : {
313 646 : return _impl->state == STATE_CONNECTED;
314 : }
315 :
316 175185 : bool Node::isClosed() const
317 : {
318 175185 : return _impl->state == STATE_CLOSED;
319 : }
320 :
321 49 : bool Node::isClosing() const
322 : {
323 49 : return _impl->state == STATE_CLOSING;
324 : }
325 :
326 74326 : bool Node::isListening() const
327 : {
328 74326 : return _impl->state == STATE_LISTENING;
329 : }
330 :
331 754 : ConnectionPtr Node::getConnection( const bool preferMulticast )
332 : {
333 754 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
334 753 : return multicast ? multicast : _impl->outgoing;
335 : }
336 :
337 71476 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
338 : {
339 71476 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
340 71476 : if( !isClosed( ))
341 71476 : return multicast ? multicast : _impl->outgoing;
342 0 : LBUNREACHABLE;
343 0 : return 0;
344 : }
345 :
346 115 : ConnectionPtr Node::_getMulticast() const
347 : {
348 115 : return _impl->outMulticast.data;
349 : }
350 :
351 0 : void Node::setHostname( const std::string& host )
352 : {
353 0 : _impl->hostname = host;
354 0 : }
355 :
356 0 : const std::string& Node::getHostname() const
357 : {
358 0 : return _impl->hostname;
359 : }
360 :
361 0 : std::string Node::getWorkDir() const
362 : {
363 : char cwd[MAXPATHLEN];
364 0 : return getcwd( cwd, MAXPATHLEN );
365 : }
366 :
367 0 : std::string Node::getLaunchQuote() const
368 : {
369 : #ifdef WIN32
370 : return "\"";
371 : #else
372 0 : return "\'";
373 : #endif
374 : }
375 :
376 71474 : OCommand Node::send( const uint32_t cmd, const bool multicast )
377 : {
378 71474 : ConnectionPtr connection = _getConnection( multicast );
379 71474 : LBASSERT( connection );
380 71474 : return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
381 : }
382 :
383 2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
384 : {
385 2 : ConnectionPtr connection = _getConnection( multicast );
386 2 : LBASSERT( connection );
387 2 : return CustomOCommand( Connections( 1, connection ), commandID );
388 : }
389 :
390 180828 : const NodeID& Node::getNodeID() const
391 : {
392 180828 : return _impl->id;
393 : }
394 :
395 0 : int64_t Node::getLastReceiveTime() const
396 : {
397 0 : return _impl->lastReceive;
398 : }
399 :
400 183 : uint32_t Node::getType() const
401 : {
402 183 : return _impl->type;
403 : }
404 :
405 0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
406 : {
407 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
408 0 : MCData data;
409 0 : data.connection = connection;
410 0 : data.node = node;
411 0 : _impl->multicasts.push_back( data );
412 0 : }
413 :
414 0 : void Node::_removeMulticast( ConnectionPtr connection )
415 : {
416 0 : LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
417 :
418 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
419 0 : if( _impl->outMulticast == connection )
420 0 : _impl->outMulticast.data = 0;
421 : else
422 : {
423 0 : for( MCDatas::iterator j = _impl->multicasts.begin();
424 0 : j != _impl->multicasts.end(); ++j )
425 : {
426 0 : if( (*j).connection != connection )
427 0 : continue;
428 :
429 0 : _impl->multicasts.erase( j );
430 0 : return;
431 : }
432 0 : }
433 : }
434 :
435 68 : void Node::_connectMulticast( NodePtr node )
436 : {
437 68 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
438 :
439 68 : if( node->_impl->outMulticast.data.isValid( ))
440 : // multicast already connected by previous _cmdID
441 68 : return;
442 :
443 : // Search if the connected node is in the same multicast group as we are
444 136 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
445 405 : for( ConnectionDescriptionsCIter i = descriptions.begin();
446 270 : i != descriptions.end(); ++i )
447 : {
448 67 : ConnectionDescriptionPtr description = *i;
449 67 : if( description->type < CONNECTIONTYPE_MULTICAST )
450 67 : continue;
451 :
452 : const ConnectionDescriptions& fromDescs =
453 0 : node->getConnectionDescriptions();
454 0 : for( ConnectionDescriptionsCIter j = fromDescs.begin();
455 0 : j != fromDescs.end(); ++j )
456 : {
457 0 : ConnectionDescriptionPtr fromDescription = *j;
458 0 : if( !description->isSameMulticastGroup( fromDescription ))
459 0 : continue;
460 :
461 0 : LBASSERT( !node->_impl->outMulticast.data );
462 0 : LBASSERT( node->_impl->multicasts.empty( ));
463 :
464 0 : if( _impl->outMulticast->isValid() &&
465 0 : _impl->outMulticast.data->getDescription() == description )
466 : {
467 0 : node->_impl->outMulticast.data = _impl->outMulticast.data;
468 0 : LBDEBUG << "Using " << description << " as multicast group for "
469 0 : << node->getNodeID() << std::endl;
470 : }
471 : // find unused multicast connection to node
472 0 : else for( MCDatas::const_iterator k = _impl->multicasts.begin();
473 0 : k != _impl->multicasts.end(); ++k )
474 : {
475 0 : const MCData& data = *k;
476 : ConstConnectionDescriptionPtr dataDesc =
477 0 : data.connection->getDescription();
478 0 : if( !description->isSameMulticastGroup( dataDesc ))
479 0 : continue;
480 :
481 0 : node->_impl->multicasts.push_back( data );
482 0 : LBDEBUG << "Adding " << dataDesc << " as multicast group for "
483 0 : << node->getNodeID() << std::endl;
484 0 : }
485 0 : }
486 68 : }
487 : }
488 :
489 0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
490 : {
491 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
492 0 : MCDatas::iterator i = node->_impl->multicasts.begin();
493 0 : for( ; i != node->_impl->multicasts.end(); ++i )
494 : {
495 0 : if( (*i).connection == connection )
496 0 : break;
497 : }
498 :
499 0 : if( node->_impl->outMulticast->isValid( ))
500 : {
501 0 : if( node->_impl->outMulticast.data == connection )
502 : {
503 : // nop, connection already used
504 0 : LBASSERT( i == node->_impl->multicasts.end( ));
505 : }
506 0 : else if( i == node->_impl->multicasts.end( ))
507 : {
508 : // another connection is used as multicast connection, save this
509 0 : LBASSERT( isListening( ));
510 0 : MCData data;
511 0 : data.connection = connection;
512 0 : data.node = this;
513 0 : _impl->multicasts.push_back( data );
514 : }
515 : // else nop, already know connection
516 : }
517 : else
518 : {
519 0 : node->_impl->outMulticast.data = connection;
520 0 : if( i != node->_impl->multicasts.end( ))
521 0 : node->_impl->multicasts.erase( i );
522 0 : }
523 0 : }
524 :
525 50 : void Node::_setListening()
526 : {
527 50 : _impl->state = STATE_LISTENING;
528 50 : }
529 :
530 49 : void Node::_setClosing()
531 : {
532 49 : _impl->state = STATE_CLOSING;
533 49 : }
534 :
535 99 : void Node::_setClosed()
536 : {
537 99 : _impl->state = STATE_CLOSED;
538 99 : }
539 :
540 118 : void Node::_connect( ConnectionPtr connection )
541 : {
542 118 : _impl->outgoing = connection;
543 118 : _impl->state = STATE_CONNECTED;
544 118 : }
545 :
546 164 : void Node::_disconnect()
547 : {
548 164 : _impl->state = STATE_CLOSED;
549 164 : _impl->outgoing = 0;
550 164 : _impl->outMulticast.data = 0;
551 164 : _impl->multicasts.clear();
552 164 : }
553 :
554 72077 : void Node::_setLastReceive( const int64_t time )
555 : {
556 72077 : _impl->lastReceive = time;
557 72077 : }
558 :
559 435 : std::ostream& operator << ( std::ostream& os, const State state )
560 : {
561 : os << ( state == STATE_CLOSED ? "closed" :
562 : state == STATE_CONNECTED ? "connected" :
563 435 : state == STATE_LISTENING ? "listening" : "ERROR" );
564 435 : return os;
565 : }
566 :
567 435 : std::ostream& operator << ( std::ostream& os, const Node& node )
568 : {
569 435 : os << "node " << node.getNodeID() << " " << node._impl->state;
570 435 : const ConnectionDescriptions& descs = node.getConnectionDescriptions();
571 862 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
572 427 : os << ", " << (*i)->toString();
573 435 : return os;
574 : }
575 :
576 66 : }
|