Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "node.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "customOCommand.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 :
28 : #include <lunchbox/scopedMutex.h>
29 : #include <lunchbox/spinLock.h>
30 :
31 : namespace co
32 : {
33 : namespace
34 : {
35 : /** The state of the node. */
36 : enum State
37 : {
38 : STATE_CLOSED, //!< initial state
39 : STATE_CONNECTED, //!< proxy for a remote node, connected
40 : STATE_LISTENING, //!< local node, listening
41 : STATE_CLOSING //!< listening, about to close
42 : };
43 :
44 0 : struct MCData
45 : {
46 : ConnectionPtr connection;
47 : NodePtr node;
48 : };
49 : typedef std::vector< MCData > MCDatas;
50 : }
51 :
52 : namespace detail
53 : {
54 : class Node
55 : {
56 : public:
57 : /** Globally unique node identifier. */
58 : NodeID id;
59 :
60 : const uint32_t type;
61 :
62 : /** The current state of this node. */
63 : State state;
64 :
65 : /** The connection to this node. */
66 : ConnectionPtr outgoing;
67 :
68 : /** The multicast connection to this node, can be 0. */
69 : lunchbox::Lockable< ConnectionPtr > outMulticast;
70 :
71 : /**
72 : * Yet unused multicast connections for this node.
73 : *
74 : * On the first multicast send usage, the connection is 'primed' by sending
75 : * our node identifier to the MC group, removed from this vector and set as
76 : * outMulticast.
77 : */
78 : MCDatas multicasts;
79 :
80 : /** The list of descriptions on how this node is reachable. */
81 : lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
82 : connectionDescriptions;
83 :
84 : /** Last time commands were received */
85 : int64_t lastReceive;
86 :
87 : /** Is a big endian host? */
88 : bool bigEndian;
89 :
90 123 : explicit Node( const uint32_t type_ )
91 : : id( lunchbox::make_UUID( ))
92 : , type( type_ )
93 : , state( STATE_CLOSED )
94 : , lastReceive ( 0 )
95 : #ifdef COLLAGE_BIGENDIAN
96 : , bigEndian( true )
97 : #else
98 123 : , bigEndian( false )
99 : #endif
100 123 : {}
101 :
102 121 : ~Node()
103 121 : {
104 121 : LBASSERT( !outgoing );
105 121 : connectionDescriptions->clear();
106 121 : }
107 : };
108 : }
109 :
110 123 : Node::Node( const uint32_t type )
111 123 : : _impl( new detail::Node( type ))
112 : {
113 122 : LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
114 122 : }
115 :
116 310 : Node::~Node()
117 : {
118 121 : LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
119 121 : delete _impl;
120 189 : }
121 :
122 0 : bool Node::operator == ( const Node* node ) const
123 : {
124 0 : LBASSERTINFO( _impl->id != node->_impl->id || this == node,
125 : "Two node instances with the same ID found "
126 : << (void*)this << " and " << (void*)node );
127 :
128 0 : return ( _impl == node->_impl );
129 : }
130 :
131 638 : ConnectionDescriptions Node::getConnectionDescriptions() const
132 : {
133 638 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
134 638 : return _impl->connectionDescriptions.data;
135 : }
136 :
137 233 : ConnectionPtr Node::getMulticast()
138 : {
139 233 : if( !isReachable( ))
140 0 : return 0;
141 :
142 233 : ConnectionPtr connection = _impl->outMulticast.data;
143 233 : if( connection && !connection->isClosed( ))
144 0 : return connection;
145 :
146 466 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
147 233 : if( _impl->multicasts.empty( ))
148 233 : return 0;
149 :
150 0 : MCData data = _impl->multicasts.back();
151 0 : _impl->multicasts.pop_back();
152 0 : NodePtr node = data.node;
153 :
154 : // prime multicast connections on peers
155 0 : LBINFO << "Announcing id " << node->getNodeID() << " to multicast group "
156 0 : << data.connection->getDescription() << std::endl;
157 :
158 : #ifdef COLLAGE_BIGENDIAN
159 : uint32_t cmd = CMD_NODE_ID_BE;
160 : lunchbox::byteswap( cmd );
161 : #else
162 0 : const uint32_t cmd = CMD_NODE_ID;
163 : #endif
164 : OCommand( Connections( 1, data.connection ), cmd )
165 0 : << node->getNodeID() << getType() << node->serialize();
166 :
167 0 : _impl->outMulticast.data = data.connection;
168 233 : return data.connection;
169 : }
170 :
171 81 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
172 : {
173 81 : LBASSERTINFO( isClosed(), *this );
174 81 : if( !isClosed( ))
175 81 : return;
176 81 : _addConnectionDescription( cd );
177 : }
178 :
179 81 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
180 : {
181 81 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
182 81 : _impl->connectionDescriptions->push_back( cd );
183 81 : }
184 :
185 0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
186 : {
187 0 : LBASSERTINFO( isClosed(), *this );
188 0 : if( !isClosed( ))
189 0 : return false;
190 0 : return _removeConnectionDescription( cd );
191 : }
192 :
193 0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
194 : {
195 0 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
196 :
197 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
198 0 : for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
199 0 : i != _impl->connectionDescriptions->end(); ++i )
200 : {
201 0 : if( *cd != **i )
202 0 : continue;
203 :
204 0 : _impl->connectionDescriptions->erase( i );
205 0 : return true;
206 : }
207 0 : return false;
208 : }
209 :
210 68 : std::string Node::serialize() const
211 : {
212 68 : std::ostringstream data;
213 68 : data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
214 136 : << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
215 68 : << CO_SEPARATOR;
216 : {
217 68 : lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
218 68 : data << co::serialize( _impl->connectionDescriptions.data );
219 : }
220 68 : return data.str();
221 : }
222 :
223 68 : bool Node::deserialize( std::string& data )
224 : {
225 68 : LBASSERT( _impl->state == STATE_CLOSED );
226 :
227 : // version check
228 68 : int32_t major = 0;
229 68 : size_t nextPos = data.find( CO_SEPARATOR );
230 68 : if( nextPos == std::string::npos || nextPos == 0 )
231 : {
232 0 : LBERROR << "Could not parse node major version data" << std::endl;
233 0 : return false;
234 : }
235 :
236 68 : std::istringstream is( data.substr( 0, nextPos ));
237 68 : data = data.substr( nextPos + 1 );
238 68 : is >> major;
239 :
240 68 : int32_t minor = 0;
241 68 : nextPos = data.find( CO_SEPARATOR );
242 68 : if( nextPos == std::string::npos || nextPos == 0 )
243 : {
244 0 : LBERROR << "Could not parse node minor version data" << std::endl;
245 0 : return false;
246 : }
247 :
248 68 : is.clear();
249 68 : is.str( data.substr( 0, nextPos ));
250 68 : data = data.substr( nextPos + 1 );
251 68 : is >> minor;
252 :
253 68 : if( major != Version::getMajor() || minor != Version::getMinor( ))
254 : {
255 0 : LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
256 0 : << minor << ", local node uses " << Version::getMajor() << '.'
257 0 : << Version::getMinor() << std::endl;
258 : }
259 :
260 : // node id
261 68 : nextPos = data.find( CO_SEPARATOR );
262 68 : if( nextPos == std::string::npos || nextPos == 0 )
263 : {
264 0 : LBERROR << "Could not parse node id data" << std::endl;
265 0 : return false;
266 : }
267 :
268 68 : _impl->id = data.substr( 0, nextPos );
269 68 : data = data.substr( nextPos + 1 );
270 :
271 : // endianness
272 68 : nextPos = data.find( CO_SEPARATOR );
273 68 : if( nextPos == std::string::npos || nextPos == 0 )
274 : {
275 0 : LBERROR << "Could not parse node endianness data" << std::endl;
276 0 : return false;
277 : }
278 :
279 68 : is.clear();
280 68 : is.str( data.substr( 0, nextPos ));
281 68 : data = data.substr( nextPos + 1 );
282 68 : is >> _impl->bigEndian;
283 :
284 : // Connections data
285 136 : lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
286 68 : _impl->connectionDescriptions->clear();
287 136 : return co::deserialize( data, _impl->connectionDescriptions.data );
288 : }
289 :
290 72077 : bool Node::isBigEndian() const
291 : {
292 72077 : return _impl->bigEndian;
293 : }
294 :
295 1341 : bool Node::isReachable() const
296 : {
297 1341 : return isListening() || isConnected();
298 : }
299 :
300 646 : bool Node::isConnected() const
301 : {
302 646 : return _impl->state == STATE_CONNECTED;
303 : }
304 :
305 175177 : bool Node::isClosed() const
306 : {
307 175177 : return _impl->state == STATE_CLOSED;
308 : }
309 :
310 49 : bool Node::isClosing() const
311 : {
312 49 : return _impl->state == STATE_CLOSING;
313 : }
314 :
315 74322 : bool Node::isListening() const
316 : {
317 74322 : return _impl->state == STATE_LISTENING;
318 : }
319 :
320 754 : ConnectionPtr Node::getConnection( const bool preferMulticast )
321 : {
322 754 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
323 754 : return multicast ? multicast : _impl->outgoing;
324 : }
325 :
326 71476 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
327 : {
328 71476 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
329 71476 : if( !isClosed( ))
330 71476 : return multicast ? multicast : _impl->outgoing;
331 0 : LBUNREACHABLE;
332 0 : return 0;
333 : }
334 :
335 115 : ConnectionPtr Node::_getMulticast() const
336 : {
337 115 : return _impl->outMulticast.data;
338 : }
339 :
340 71474 : OCommand Node::send( const uint32_t cmd, const bool multicast )
341 : {
342 71474 : ConnectionPtr connection = _getConnection( multicast );
343 71474 : LBASSERT( connection );
344 71474 : return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
345 : }
346 :
347 2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
348 : {
349 2 : ConnectionPtr connection = _getConnection( multicast );
350 2 : LBASSERT( connection );
351 2 : return CustomOCommand( Connections( 1, connection ), commandID );
352 : }
353 :
354 192211 : const NodeID& Node::getNodeID() const
355 : {
356 192211 : return _impl->id;
357 : }
358 :
359 0 : int64_t Node::getLastReceiveTime() const
360 : {
361 0 : return _impl->lastReceive;
362 : }
363 :
364 183 : uint32_t Node::getType() const
365 : {
366 183 : return _impl->type;
367 : }
368 :
369 0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
370 : {
371 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
372 0 : MCData data;
373 0 : data.connection = connection;
374 0 : data.node = node;
375 0 : _impl->multicasts.push_back( data );
376 0 : }
377 :
378 0 : void Node::_removeMulticast( ConnectionPtr connection )
379 : {
380 0 : LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
381 :
382 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
383 0 : if( _impl->outMulticast == connection )
384 0 : _impl->outMulticast.data = 0;
385 : else
386 : {
387 0 : for( MCDatas::iterator j = _impl->multicasts.begin();
388 0 : j != _impl->multicasts.end(); ++j )
389 : {
390 0 : if( (*j).connection != connection )
391 0 : continue;
392 :
393 0 : _impl->multicasts.erase( j );
394 0 : return;
395 : }
396 0 : }
397 : }
398 :
399 68 : void Node::_connectMulticast( NodePtr node )
400 : {
401 68 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
402 :
403 68 : if( node->_impl->outMulticast.data.isValid( ))
404 : // multicast already connected by previous _cmdID
405 68 : return;
406 :
407 : // Search if the connected node is in the same multicast group as we are
408 136 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
409 405 : for( ConnectionDescriptionsCIter i = descriptions.begin();
410 270 : i != descriptions.end(); ++i )
411 : {
412 67 : ConnectionDescriptionPtr description = *i;
413 67 : if( description->type < CONNECTIONTYPE_MULTICAST )
414 67 : continue;
415 :
416 : const ConnectionDescriptions& fromDescs =
417 0 : node->getConnectionDescriptions();
418 0 : for( ConnectionDescriptionsCIter j = fromDescs.begin();
419 0 : j != fromDescs.end(); ++j )
420 : {
421 0 : ConnectionDescriptionPtr fromDescription = *j;
422 0 : if( !description->isSameMulticastGroup( fromDescription ))
423 0 : continue;
424 :
425 0 : LBASSERT( !node->_impl->outMulticast.data );
426 0 : LBASSERT( node->_impl->multicasts.empty( ));
427 :
428 0 : if( _impl->outMulticast->isValid() &&
429 0 : _impl->outMulticast.data->getDescription() == description )
430 : {
431 0 : node->_impl->outMulticast.data = _impl->outMulticast.data;
432 0 : LBINFO << "Using " << description << " as multicast group for "
433 0 : << node->getNodeID() << std::endl;
434 : }
435 : // find unused multicast connection to node
436 0 : else for( MCDatas::const_iterator k = _impl->multicasts.begin();
437 0 : k != _impl->multicasts.end(); ++k )
438 : {
439 0 : const MCData& data = *k;
440 : ConstConnectionDescriptionPtr dataDesc =
441 0 : data.connection->getDescription();
442 0 : if( !description->isSameMulticastGroup( dataDesc ))
443 0 : continue;
444 :
445 0 : node->_impl->multicasts.push_back( data );
446 0 : LBINFO << "Adding " << dataDesc << " as multicast group for "
447 0 : << node->getNodeID() << std::endl;
448 0 : }
449 0 : }
450 68 : }
451 : }
452 :
453 0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
454 : {
455 0 : lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
456 0 : MCDatas::iterator i = node->_impl->multicasts.begin();
457 0 : for( ; i != node->_impl->multicasts.end(); ++i )
458 : {
459 0 : if( (*i).connection == connection )
460 0 : break;
461 : }
462 :
463 0 : if( node->_impl->outMulticast->isValid( ))
464 : {
465 0 : if( node->_impl->outMulticast.data == connection )
466 : {
467 : // nop, connection already used
468 0 : LBASSERT( i == node->_impl->multicasts.end( ));
469 : }
470 0 : else if( i == node->_impl->multicasts.end( ))
471 : {
472 : // another connection is used as multicast connection, save this
473 0 : LBASSERT( isListening( ));
474 0 : MCData data;
475 0 : data.connection = connection;
476 0 : data.node = this;
477 0 : _impl->multicasts.push_back( data );
478 : }
479 : // else nop, already know connection
480 : }
481 : else
482 : {
483 0 : node->_impl->outMulticast.data = connection;
484 0 : if( i != node->_impl->multicasts.end( ))
485 0 : node->_impl->multicasts.erase( i );
486 0 : }
487 0 : }
488 :
489 50 : void Node::_setListening()
490 : {
491 50 : _impl->state = STATE_LISTENING;
492 50 : }
493 :
494 49 : void Node::_setClosing()
495 : {
496 49 : _impl->state = STATE_CLOSING;
497 49 : }
498 :
499 99 : void Node::_setClosed()
500 : {
501 99 : _impl->state = STATE_CLOSED;
502 99 : }
503 :
504 118 : void Node::_connect( ConnectionPtr connection )
505 : {
506 118 : _impl->outgoing = connection;
507 118 : _impl->state = STATE_CONNECTED;
508 118 : }
509 :
510 164 : void Node::_disconnect()
511 : {
512 164 : _impl->state = STATE_CLOSED;
513 164 : _impl->outgoing = 0;
514 164 : _impl->outMulticast.data = 0;
515 164 : _impl->multicasts.clear();
516 164 : }
517 :
518 72077 : void Node::_setLastReceive( const int64_t time )
519 : {
520 72077 : _impl->lastReceive = time;
521 72077 : }
522 :
523 435 : std::ostream& operator << ( std::ostream& os, const State state )
524 : {
525 : os << ( state == STATE_CLOSED ? "closed" :
526 : state == STATE_CONNECTED ? "connected" :
527 435 : state == STATE_LISTENING ? "listening" : "ERROR" );
528 435 : return os;
529 : }
530 :
531 435 : std::ostream& operator << ( std::ostream& os, const Node& node )
532 : {
533 435 : os << "node " << node.getNodeID() << " " << node._impl->state;
534 435 : const ConnectionDescriptions& descs = node.getConnectionDescriptions();
535 862 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
536 426 : os << ", " << (*i)->toString();
537 435 : return os;
538 : }
539 :
540 66 : }
|