Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/futureFunction.h>
46 : #include <lunchbox/hash.h>
47 : #include <lunchbox/launcher.h>
48 : #include <lunchbox/lockable.h>
49 : #include <lunchbox/request.h>
50 : #include <lunchbox/rng.h>
51 : #include <lunchbox/servus.h>
52 : #include <lunchbox/sleep.h>
53 :
54 : #include <boost/bind.hpp>
55 : #include <boost/date_time/posix_time/posix_time.hpp>
56 : #include <boost/lexical_cast.hpp>
57 :
58 : #include <list>
59 : #ifdef _MSC_VER
60 : # include <direct.h> // for chdir
61 : # define chdir _chdir
62 : #endif
63 :
64 : namespace co
65 : {
66 : namespace
67 : {
68 22 : lunchbox::a_int32_t _threadIDs;
69 :
70 : typedef CommandFunc< LocalNode > CmdFunc;
71 : typedef std::list< ICommand > CommandList;
72 : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
73 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
74 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
75 : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
76 : typedef NodeHash::const_iterator NodeHashCIter;
77 : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
78 : typedef HandlerHash::const_iterator HandlerHashCIter;
79 : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
80 : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
81 : typedef CommandHash::const_iterator CommandHashCIter;
82 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
83 : }
84 :
85 : namespace detail
86 : {
87 106 : class ReceiverThread : public lunchbox::Thread
88 : {
89 : public:
90 55 : explicit ReceiverThread( co::LocalNode* localNode )
91 55 : : _localNode( localNode ) {}
92 :
93 50 : bool init() override
94 : {
95 50 : const int32_t threadID = ++_threadIDs - 1;
96 100 : setName( std::string( "Rcv" ) +
97 150 : boost::lexical_cast< std::string >( threadID ));
98 50 : return _localNode->_startCommandThread( threadID );
99 : }
100 :
101 50 : void run() override { _localNode->_runReceiverThread(); }
102 :
103 : private:
104 : co::LocalNode* const _localNode;
105 : };
106 :
107 106 : class CommandThread : public Worker
108 : {
109 : public:
110 55 : explicit CommandThread( co::LocalNode* localNode )
111 55 : : Worker( Global::getCommandQueueLimit( ))
112 : , threadID( 0 )
113 55 : , _localNode( localNode )
114 55 : {}
115 :
116 : int32_t threadID;
117 :
118 : protected:
119 50 : bool init() override
120 : {
121 100 : setName( std::string( "Cmd" ) +
122 150 : boost::lexical_cast< std::string >( threadID ));
123 50 : return true;
124 : }
125 :
126 103343 : bool stopRunning() override { return _localNode->isClosed(); }
127 51574 : bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
128 :
129 : private:
130 : co::LocalNode* const _localNode;
131 : };
132 :
133 : class LocalNode
134 : {
135 : public:
136 55 : LocalNode()
137 55 : : smallBuffers( 200 )
138 : , bigBuffers( 20 )
139 : , sendToken( true )
140 : , lastSendToken( 0 )
141 : , objectStore( 0 )
142 : , receiverThread( 0 )
143 : , commandThread( 0 )
144 55 : , service( "_collage._tcp" )
145 55 : {}
146 :
147 53 : ~LocalNode()
148 53 : {
149 53 : LBASSERT( incoming.isEmpty( ));
150 53 : LBASSERT( connectionNodes.empty( ));
151 53 : LBASSERT( pendingCommands.empty( ));
152 53 : LBASSERT( nodes->empty( ));
153 :
154 53 : delete objectStore;
155 53 : objectStore = 0;
156 53 : LBASSERT( !commandThread->isRunning( ));
157 53 : delete commandThread;
158 53 : commandThread = 0;
159 :
160 53 : LBASSERT( !receiverThread->isRunning( ));
161 53 : delete receiverThread;
162 53 : receiverThread = 0;
163 53 : }
164 :
165 280 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
166 :
167 : /** Commands re-scheduled for dispatch. */
168 : CommandList pendingCommands;
169 :
170 : /** The command buffer 'allocator' for small packets */
171 : co::BufferCache smallBuffers;
172 :
173 : /** The command buffer 'allocator' for big packets */
174 : co::BufferCache bigBuffers;
175 :
176 : bool sendToken; //!< send token availability.
177 : uint64_t lastSendToken; //!< last used time for timeout detection
178 : std::deque< co::ICommand > sendTokenQueue; //!< pending requests
179 :
180 : /** Manager of distributed object */
181 : ObjectStore* objectStore;
182 :
183 : /** Needed for thread-safety during nodeID-based connect() */
184 : lunchbox::Lock connectLock;
185 :
186 : /** The node for each connection. */
187 : ConnectionNodeHash connectionNodes; // read and write: recv only
188 :
189 : /** The connected nodes. */
190 : lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
191 :
192 : /** The connection set of all connections from/to this node. */
193 : co::ConnectionSet incoming;
194 :
195 : /** The process-global clock. */
196 : lunchbox::Clock clock;
197 :
198 : /** The registered push handlers. */
199 : lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
200 :
201 : /** The registered custom command handlers. */
202 : lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
203 :
204 : ReceiverThread* receiverThread;
205 : CommandThread* commandThread;
206 :
207 : lunchbox::Lockable< servus::Servus > service;
208 :
209 : a_ssize_t counters[ co::LocalNode::COUNTER_ALL ]; //!< Performance counters
210 :
211 : Strings args; //!< Command line arguments for remote node launch()
212 : };
213 : }
214 :
215 55 : LocalNode::LocalNode( const uint32_t type )
216 : : Node( type )
217 55 : , _impl( new detail::LocalNode )
218 : {
219 55 : _impl->receiverThread = new detail::ReceiverThread( this );
220 55 : _impl->commandThread = new detail::CommandThread( this );
221 55 : _impl->objectStore = new ObjectStore( this, _impl->counters );
222 :
223 55 : CommandQueue* queue = getCommandThreadQueue();
224 55 : registerCommand( CMD_NODE_CONNECT,
225 110 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
226 55 : registerCommand( CMD_NODE_CONNECT_BE,
227 110 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
228 55 : registerCommand( CMD_NODE_CONNECT_REPLY,
229 110 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
230 55 : registerCommand( CMD_NODE_CONNECT_REPLY_BE,
231 110 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
232 55 : registerCommand( CMD_NODE_ID,
233 110 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
234 55 : registerCommand( CMD_NODE_ID_BE,
235 110 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
236 55 : registerCommand( CMD_NODE_ACK_REQUEST,
237 110 : CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
238 55 : registerCommand( CMD_NODE_STOP_RCV,
239 110 : CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
240 55 : registerCommand( CMD_NODE_STOP_CMD,
241 110 : CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
242 55 : registerCommand( CMD_NODE_SET_AFFINITY_RCV,
243 110 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
244 55 : registerCommand( CMD_NODE_SET_AFFINITY_CMD,
245 110 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
246 55 : registerCommand( CMD_NODE_CONNECT_ACK,
247 110 : CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
248 55 : registerCommand( CMD_NODE_DISCONNECT,
249 110 : CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
250 55 : registerCommand( CMD_NODE_GET_NODE_DATA,
251 110 : CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
252 55 : registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
253 110 : CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
254 55 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
255 110 : CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
256 55 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
257 110 : CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
258 55 : registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
259 110 : CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
260 55 : registerCommand( CMD_NODE_ADD_LISTENER,
261 110 : CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
262 55 : registerCommand( CMD_NODE_REMOVE_LISTENER,
263 110 : CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
264 55 : registerCommand( CMD_NODE_PING,
265 110 : CmdFunc( this, &LocalNode::_cmdPing ), queue );
266 55 : registerCommand( CMD_NODE_PING_REPLY,
267 110 : CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
268 55 : registerCommand( CMD_NODE_COMMAND,
269 110 : CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
270 55 : registerCommand( CMD_NODE_ADD_CONNECTION,
271 110 : CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
272 55 : }
273 :
274 148 : LocalNode::~LocalNode( )
275 : {
276 53 : LBASSERT( !hasPendingRequests( ));
277 53 : delete _impl;
278 95 : }
279 :
280 1 : bool LocalNode::initLocal( const int argc, char** argv )
281 : {
282 2 : std::string setupOpts;
283 1 : _impl->args.clear();
284 :
285 : // We do not use getopt_long because it really does not work due to the
286 : // following aspects:
287 : // - reordering of arguments
288 : // - different behavior of GNU and BSD implementations
289 : // - incomplete man pages
290 1 : for( int i = 1; i < argc; ++i )
291 : {
292 0 : if( std::string( "--eq-listen" ) == argv[i] ||
293 0 : std::string( "--co-listen" ) == argv[i] )
294 : {
295 0 : if( std::string( "--eq-listen" ) == argv[i] )
296 0 : LBWARN << "Deprecated --eq-listen, use --co-listen"
297 0 : << std::endl;
298 :
299 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
300 : {
301 0 : std::string data = argv[++i];
302 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
303 0 : desc->port = Global::getDefaultPort();
304 :
305 0 : if( desc->fromString( data ))
306 : {
307 0 : addConnectionDescription( desc );
308 0 : LBASSERTINFO( data.empty(), data );
309 : }
310 : else
311 0 : LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
312 : }
313 : else
314 : {
315 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
316 : }
317 : }
318 0 : else if( std::string( "--co-globals" ) == argv[i] )
319 : {
320 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
321 : {
322 0 : const std::string data = argv[++i];
323 0 : if( !Global::fromString( data ))
324 : {
325 0 : LBWARN << "Invalid global variables string: " << data
326 0 : << ", using default global variables." << std::endl;
327 0 : }
328 : }
329 : else
330 : {
331 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
332 : }
333 : }
334 0 : else if( std::string( "--co-launch" ) == argv[i] )
335 : {
336 0 : if( i == argc-1 || argv[i+1][0] == '-' )
337 0 : LBTHROW( std::runtime_error( "Missing options to --co-launch"));
338 :
339 0 : setupOpts = argv[ ++i ];
340 0 : if( !deserialize( setupOpts ))
341 0 : LBTHROW( std::runtime_error( "Corrupt --co-launch argument" ));
342 0 : LBASSERT( !setupOpts.empty( ));
343 : }
344 : else
345 0 : _impl->args.push_back( argv[ i ]);
346 : }
347 :
348 1 : if( !listen( ))
349 : {
350 0 : LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
351 0 : << std::endl;
352 0 : return false;
353 : }
354 :
355 1 : if( !setupOpts.empty( ))
356 0 : return _setupPeer( setupOpts );
357 1 : return true;
358 : }
359 :
360 50 : bool LocalNode::listen()
361 : {
362 50 : LBVERB << "Listener data: " << serialize() << std::endl;
363 50 : if( !isClosed() || !_connectSelf( ))
364 0 : return false;
365 :
366 100 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
367 291 : for( ConnectionDescriptionsCIter i = descriptions.begin();
368 194 : i != descriptions.end(); ++i )
369 : {
370 94 : ConnectionDescriptionPtr description = *i;
371 94 : ConnectionPtr connection = Connection::create( description );
372 :
373 47 : if( !connection || !connection->listen( ))
374 : {
375 0 : LBWARN << "Can't create listener connection: " << description
376 0 : << std::endl;
377 0 : return false;
378 : }
379 :
380 47 : _impl->connectionNodes[ connection ] = this;
381 47 : if( connection->isMulticast( ))
382 0 : _addMulticast( this, connection );
383 :
384 47 : connection->acceptNB();
385 47 : _impl->incoming.addConnection( connection );
386 :
387 94 : LBVERB << "Added node " << getNodeID() << " using " << connection
388 141 : << std::endl;
389 : }
390 :
391 100 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
392 150 : << std::endl;
393 :
394 50 : _setListening();
395 50 : _impl->receiverThread->start();
396 :
397 50 : LBDEBUG << *this << std::endl;
398 50 : return true;
399 : }
400 :
401 49 : bool LocalNode::close()
402 : {
403 49 : if( !isListening() )
404 0 : return false;
405 :
406 49 : send( CMD_NODE_STOP_RCV );
407 :
408 49 : LBCHECK( _impl->receiverThread->join( ));
409 49 : _cleanup();
410 :
411 147 : LBDEBUG << _impl->incoming.getSize() << " connections open after close"
412 147 : << std::endl;
413 : #ifndef NDEBUG
414 49 : const Connections& connections = _impl->incoming.getConnections();
415 147 : for( Connections::const_iterator i = connections.begin();
416 98 : i != connections.end(); ++i )
417 : {
418 0 : LBDEBUG << " " << *i << std::endl;
419 : }
420 : #endif
421 :
422 49 : LBASSERTINFO( !hasPendingRequests(),
423 : *static_cast< lunchbox::RequestHandler* >( this ));
424 49 : return true;
425 : }
426 :
427 0 : void LocalNode::setAffinity( const int32_t affinity )
428 : {
429 0 : send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
430 0 : send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
431 :
432 0 : lunchbox::Thread::setAffinity( affinity );
433 0 : }
434 :
435 34 : void LocalNode::addConnection( ConnectionPtr connection )
436 : {
437 34 : connection->ref(); // unref in _cmdAddConnection
438 34 : send( CMD_NODE_ADD_CONNECTION ) << connection;
439 34 : }
440 :
441 0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
442 : {
443 0 : LBASSERT( isListening( ));
444 :
445 0 : ConnectionPtr connection = Connection::create( desc );
446 0 : if( connection && connection->listen( ))
447 : {
448 0 : addListener( connection );
449 0 : return connection;
450 : }
451 :
452 0 : return 0;
453 : }
454 :
455 0 : void LocalNode::addListener( ConnectionPtr connection )
456 : {
457 0 : LBASSERT( isListening( ));
458 0 : LBASSERT( connection->isListening( ));
459 0 : if( !isListening() || !connection->isListening( ))
460 0 : return;
461 :
462 0 : connection->ref( this ); // unref in self handler
463 :
464 : // Update everybody's description list of me, add the listener to myself in
465 : // my handler
466 0 : const Nodes& nodes = getNodes();
467 0 : for( NodePtr node : nodes )
468 0 : node->send( CMD_NODE_ADD_LISTENER )
469 0 : << (uint64_t)(connection.get( ))
470 0 : << connection->getDescription()->toString();
471 : }
472 :
473 0 : void LocalNode::removeListeners( const Connections& connections )
474 : {
475 0 : std::vector< lunchbox::Request< void > > requests;
476 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
477 : {
478 0 : ConnectionPtr connection = *i;
479 0 : requests.push_back( _removeListener( connection ));
480 : }
481 0 : }
482 :
483 0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
484 : {
485 0 : LBASSERT( isListening( ));
486 0 : LBASSERTINFO( !conn->isConnected(), conn );
487 :
488 0 : conn->ref( this );
489 0 : const lunchbox::Request< void > request = registerRequest< void >();
490 :
491 0 : const Nodes& nodes = getNodes();
492 0 : for( NodePtr node : nodes )
493 0 : node->send( CMD_NODE_REMOVE_LISTENER )
494 0 : << request << conn.get() << conn->getDescription()->toString();
495 0 : return request;
496 : }
497 :
498 152 : void LocalNode::_addConnection( ConnectionPtr connection )
499 : {
500 152 : if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
501 : {
502 34 : addConnection( connection );
503 34 : return;
504 : }
505 :
506 236 : BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
507 118 : connection->recvNB( buffer, COMMAND_MINSIZE );
508 118 : _impl->incoming.addConnection( connection );
509 : }
510 :
511 224 : void LocalNode::_removeConnection( ConnectionPtr connection )
512 : {
513 224 : LBASSERT( connection );
514 :
515 224 : _impl->incoming.removeConnection( connection );
516 224 : connection->resetRecvData();
517 224 : if( !connection->isClosed( ))
518 130 : connection->close(); // cancel pending IO's
519 224 : }
520 :
521 49 : void LocalNode::_cleanup()
522 : {
523 49 : LBVERB << "Clean up stopped node" << std::endl;
524 49 : LBASSERTINFO( isClosed(), *this );
525 :
526 49 : if( !_impl->connectionNodes.empty( ))
527 141 : LBDEBUG << _impl->connectionNodes.size()
528 141 : << " open connections during cleanup" << std::endl;
529 : #ifndef NDEBUG
530 288 : for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
531 192 : i != _impl->connectionNodes.end(); ++i )
532 : {
533 94 : NodePtr node = i->second;
534 47 : LBDEBUG << " " << i->first << " : " << node << std::endl;
535 : }
536 : #endif
537 :
538 49 : _impl->connectionNodes.clear();
539 :
540 49 : if( !_impl->nodes->empty( ))
541 6 : LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
542 6 : << std::endl;
543 :
544 : #ifndef NDEBUG
545 51 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
546 : {
547 4 : NodePtr node = i->second;
548 2 : LBDEBUG << " " << node << std::endl;
549 : }
550 : #endif
551 :
552 49 : _impl->nodes->clear();
553 49 : }
554 :
555 115 : void LocalNode::_closeNode( NodePtr node )
556 : {
557 230 : ConnectionPtr connection = node->getConnection();
558 230 : ConnectionPtr mcConnection = node->_getMulticast();
559 :
560 115 : node->_disconnect();
561 :
562 115 : if( connection )
563 : {
564 68 : LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
565 : _impl->connectionNodes.end(), connection );
566 :
567 68 : _removeConnection( connection );
568 68 : _impl->connectionNodes.erase( connection );
569 : }
570 :
571 115 : if( mcConnection )
572 : {
573 0 : _removeConnection( mcConnection );
574 0 : _impl->connectionNodes.erase( mcConnection );
575 : }
576 :
577 115 : _impl->objectStore->removeInstanceData( node->getNodeID( ));
578 :
579 230 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
580 115 : _impl->nodes->erase( node->getNodeID( ));
581 115 : notifyDisconnect( node );
582 115 : LBDEBUG << node << " disconnected from " << *this << std::endl;
583 115 : }
584 :
585 50 : bool LocalNode::_connectSelf()
586 : {
587 : // setup local connection to myself
588 100 : PipeConnectionPtr connection = new PipeConnection;
589 50 : if( !connection->connect( ))
590 : {
591 0 : LBERROR << "Could not create local connection to receiver thread."
592 0 : << std::endl;
593 0 : return false;
594 : }
595 :
596 50 : Node::_connect( connection->getSibling( ));
597 50 : _setClosed(); // reset state after _connect set it to connected
598 :
599 : // add to connection set
600 50 : LBASSERT( connection->getDescription( ));
601 50 : LBASSERT( _impl->connectionNodes.find( connection ) ==
602 : _impl->connectionNodes.end( ));
603 :
604 50 : _impl->connectionNodes[ connection ] = this;
605 50 : _impl->nodes.data[ getNodeID() ] = this;
606 50 : _addConnection( connection );
607 :
608 100 : LBVERB << "Added node " << getNodeID() << " using " << connection
609 150 : << std::endl;
610 50 : return true;
611 : }
612 :
613 8 : bool LocalNode::disconnect( NodePtr node )
614 : {
615 8 : if( !node || !isListening() )
616 0 : return false;
617 :
618 8 : if( !node->isConnected( ))
619 0 : return true;
620 :
621 8 : LBASSERT( !inCommandThread( ));
622 16 : lunchbox::Request< void > request = registerRequest< void >( node.get( ));
623 8 : send( CMD_NODE_DISCONNECT ) << request;
624 :
625 8 : request.wait();
626 8 : _impl->objectStore->removeNode( node );
627 8 : return true;
628 : }
629 :
630 20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
631 : {
632 20001 : if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
633 0 : return;
634 :
635 20001 : if( node == this ) // OPT
636 0 : serveRequest( requestID );
637 : else
638 20001 : node->send( CMD_NODE_ACK_REQUEST ) << requestID;
639 : }
640 :
641 0 : void LocalNode::ping( NodePtr peer )
642 : {
643 0 : LBASSERT( !_impl->inReceiverThread( ));
644 0 : peer->send( CMD_NODE_PING );
645 0 : }
646 :
647 0 : bool LocalNode::pingIdleNodes()
648 : {
649 0 : LBASSERT( !_impl->inReceiverThread( ) );
650 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
651 0 : const Nodes& nodes = getNodes( false );
652 :
653 0 : bool pinged = false;
654 0 : for( NodePtr node : nodes )
655 : {
656 0 : if( getTime64() - node->getLastReceiveTime() > timeout )
657 : {
658 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
659 0 : << node->getLastReceiveTime() << std::endl;
660 0 : node->send( CMD_NODE_PING );
661 0 : pinged = true;
662 : }
663 : }
664 0 : return pinged;
665 : }
666 :
667 : //----------------------------------------------------------------------
668 : // Object functionality
669 : //----------------------------------------------------------------------
670 0 : void LocalNode::disableInstanceCache()
671 : {
672 0 : _impl->objectStore->disableInstanceCache();
673 0 : }
674 :
675 0 : void LocalNode::expireInstanceData( const int64_t age )
676 : {
677 0 : _impl->objectStore->expireInstanceData( age );
678 0 : }
679 :
680 0 : void LocalNode::enableSendOnRegister()
681 : {
682 0 : _impl->objectStore->enableSendOnRegister();
683 0 : }
684 :
685 0 : void LocalNode::disableSendOnRegister()
686 : {
687 0 : _impl->objectStore->disableSendOnRegister();
688 0 : }
689 :
690 21 : bool LocalNode::registerObject( Object* object )
691 : {
692 21 : return _impl->objectStore->register_( object );
693 : }
694 :
695 21 : void LocalNode::deregisterObject( Object* object )
696 : {
697 21 : _impl->objectStore->deregister( object );
698 21 : }
699 :
700 40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
701 : NodePtr master, const uint128_t& version )
702 : {
703 80 : const uint32_t request = _impl->objectStore->mapNB( object, id, version,
704 40 : master );
705 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
706 80 : _impl->objectStore, request );
707 80 : return f_bool_t( new FuturebImpl( func ));
708 : }
709 :
710 0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
711 : const uint128_t& version )
712 : {
713 0 : return _impl->objectStore->mapNB( object, id, version, 0 );
714 : }
715 :
716 2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
717 : const uint128_t& version, NodePtr master )
718 : {
719 2 : return _impl->objectStore->mapNB( object, id, version, master );
720 : }
721 :
722 :
723 2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
724 : {
725 2 : return _impl->objectStore->mapSync( requestID );
726 : }
727 :
728 4 : f_bool_t LocalNode::syncObject( Object* object, const uint128_t& id,
729 : NodePtr master, const uint32_t instanceID )
730 : {
731 4 : return _impl->objectStore->sync( object, id, master, instanceID );
732 : }
733 :
734 42 : void LocalNode::unmapObject( Object* object )
735 : {
736 42 : _impl->objectStore->unmap( object );
737 42 : }
738 :
739 0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
740 : {
741 0 : _impl->objectStore->swap( oldObject, newObject );
742 0 : }
743 :
744 0 : void LocalNode::objectPush( const uint128_t& groupID,
745 : const uint128_t& objectType,
746 : const uint128_t& objectID, DataIStream& istream )
747 : {
748 0 : lunchbox::ScopedRead mutex( _impl->pushHandlers );
749 0 : HandlerHashCIter i = _impl->pushHandlers->find( groupID );
750 0 : if( i != _impl->pushHandlers->end( ))
751 0 : i->second( groupID, objectType, objectID, istream );
752 : else
753 0 : LBWARN << "No custom handler for push group " << groupID
754 0 : << " registered" << std::endl;
755 :
756 0 : if( istream.wasUsed() && istream.hasData( ))
757 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
758 0 : << objectType << " object " << objectID << std::endl;
759 0 : }
760 :
761 0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
762 : const PushHandler& handler )
763 : {
764 0 : lunchbox::ScopedWrite mutex( _impl->pushHandlers );
765 0 : (*_impl->pushHandlers)[ groupID ] = handler;
766 0 : }
767 :
768 2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
769 : const CommandHandler& func,
770 : CommandQueue* queue )
771 : {
772 4 : lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
773 2 : if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
774 : {
775 0 : LBWARN << "Already got a registered handler for custom command "
776 0 : << command << std::endl;
777 0 : return false;
778 : }
779 :
780 4 : _impl->commandHandlers->insert( std::make_pair( command,
781 6 : std::make_pair( func, queue )));
782 2 : return true;
783 : }
784 :
785 0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
786 : {
787 0 : LBASSERT( !inCommandThread( ));
788 0 : LBASSERT( !_impl->inReceiverThread( ));
789 :
790 0 : lunchbox::Request< void > request = registerRequest< void >();
791 0 : node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
792 :
793 : try
794 : {
795 0 : request.wait( Global::getTimeout( ));
796 : }
797 0 : catch( const lunchbox::FutureTimeout& )
798 : {
799 0 : LBERROR << "Timeout while acquiring send token " << request.getID()
800 0 : << std::endl;
801 0 : request.unregister();
802 0 : return 0;
803 : }
804 0 : return new co::SendToken( node );
805 : }
806 :
807 0 : void LocalNode::releaseSendToken( SendToken token )
808 : {
809 0 : LBASSERT( !_impl->inReceiverThread( ));
810 0 : if( token )
811 0 : token->release();
812 0 : }
813 :
814 : //----------------------------------------------------------------------
815 : // Connecting a node
816 : //----------------------------------------------------------------------
817 : namespace
818 : {
819 : enum ConnectResult
820 : {
821 : CONNECT_OK,
822 : CONNECT_TRY_AGAIN,
823 : CONNECT_BAD_STATE,
824 : CONNECT_TIMEOUT,
825 : CONNECT_UNREACHABLE
826 : };
827 : }
828 :
829 74 : NodePtr LocalNode::connect( const NodeID& nodeID )
830 : {
831 74 : LBASSERT( nodeID != 0 );
832 74 : LBASSERT( isListening( ));
833 :
834 : // Make sure that only one connection request based on the node identifier
835 : // is pending at a given time. Otherwise a node with the same id might be
836 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
837 : // mutex is to register connecting nodes with this local node, and handle
838 : // all cases correctly, which is far more complex. Node connections only
839 : // happen a lot during initialization, and are therefore not time-critical.
840 148 : lunchbox::ScopedWrite mutex( _impl->connectLock );
841 :
842 148 : Nodes nodes = getNodes();
843 77 : for( NodePtr peer : nodes )
844 : {
845 77 : if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
846 74 : return peer;
847 : }
848 :
849 0 : LBDEBUG << "Connecting node " << nodeID << std::endl;
850 0 : for( NodePtr peer : nodes )
851 : {
852 0 : NodePtr node = _connect( nodeID, peer );
853 0 : if( node )
854 0 : return node;
855 : }
856 : {
857 0 : NodePtr node = _connectFromZeroconf( nodeID );
858 0 : if( node )
859 0 : return node;
860 : }
861 : // check again if node connected by itself by now
862 0 : nodes = getNodes();
863 0 : for( NodePtr node : nodes )
864 0 : if( node->getNodeID() == nodeID && node->isReachable( ))
865 0 : return node;
866 :
867 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
868 0 : return 0;
869 : }
870 :
871 0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
872 : {
873 0 : LBASSERT( nodeID != 0 );
874 :
875 0 : NodePtr node;
876 : {
877 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
878 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
879 0 : if( i != _impl->nodes->end( ))
880 0 : node = i->second;
881 : }
882 :
883 0 : LBASSERT( getNodeID() != nodeID );
884 0 : if( !node )
885 : {
886 0 : lunchbox::Request< void* > request = registerRequest< void* >();
887 0 : peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
888 0 : node = reinterpret_cast< Node* >( request.wait( ));
889 0 : if( !node )
890 : {
891 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
892 0 : << std::endl;
893 0 : return 0;
894 : }
895 0 : node->unref( this ); // ref'd before serveRequest()
896 : }
897 :
898 0 : if( node->isReachable( ))
899 0 : return node;
900 :
901 0 : size_t tries = 10;
902 0 : while( --tries )
903 : {
904 0 : switch( _connect( node ))
905 : {
906 : case CONNECT_OK:
907 0 : return node;
908 : case CONNECT_TRY_AGAIN:
909 : {
910 0 : lunchbox::RNG rng;
911 : // collision avoidance
912 0 : lunchbox::sleep( rng.get< uint8_t >( ));
913 0 : break;
914 : }
915 : case CONNECT_BAD_STATE:
916 0 : LBWARN << "Internal connect error" << std::endl;
917 : // no break;
918 : case CONNECT_TIMEOUT:
919 0 : return 0;
920 :
921 : case CONNECT_UNREACHABLE:
922 0 : break; // maybe peer talks to us
923 : }
924 :
925 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
926 : // connect failed - check for simultaneous connect from peer
927 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
928 0 : if( i != _impl->nodes->end( ))
929 0 : node = i->second;
930 : }
931 :
932 0 : return node->isReachable() ? node : 0;
933 : }
934 :
935 0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
936 : {
937 0 : lunchbox::ScopedWrite mutex( _impl->service );
938 :
939 : const Strings& instances =
940 0 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
941 0 : for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
942 : {
943 0 : const std::string& instance = *i;
944 0 : const NodeID candidate( instance );
945 0 : if( candidate != nodeID )
946 0 : continue;
947 :
948 0 : const std::string& typeStr = _impl->service->get( instance, "co_type" );
949 0 : if( typeStr.empty( ))
950 0 : return 0;
951 :
952 0 : std::istringstream in( typeStr );
953 0 : uint32_t type = 0;
954 0 : in >> type;
955 :
956 0 : NodePtr node = createNode( type );
957 0 : if( !node )
958 : {
959 0 : LBINFO << "Can't create node of type " << type << std::endl;
960 0 : continue;
961 : }
962 :
963 0 : const std::string& numStr = _impl->service->get( instance,
964 0 : "co_numPorts" );
965 0 : uint32_t num = 0;
966 :
967 0 : in.clear();
968 0 : in.str( numStr );
969 0 : in >> num;
970 0 : LBASSERT( num > 0 );
971 0 : for( size_t j = 0; j < num; ++j )
972 : {
973 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
974 0 : std::ostringstream out;
975 0 : out << "co_port" << j;
976 :
977 0 : std::string descStr = _impl->service->get( instance, out.str( ));
978 0 : LBASSERT( !descStr.empty( ));
979 0 : LBCHECK( desc->fromString( descStr ));
980 0 : LBASSERT( descStr.empty( ));
981 0 : node->addConnectionDescription( desc );
982 : }
983 0 : mutex.leave();
984 0 : if( _connect( node ))
985 0 : return node;
986 : }
987 0 : return 0;
988 : }
989 :
990 34 : bool LocalNode::connect( NodePtr node )
991 : {
992 68 : lunchbox::ScopedWrite mutex( _impl->connectLock );
993 68 : return ( _connect( node ) == CONNECT_OK );
994 : }
995 :
996 34 : uint32_t LocalNode::_connect( NodePtr node )
997 : {
998 34 : LBASSERTINFO( isListening(), *this );
999 34 : if( node->isReachable( ))
1000 0 : return CONNECT_OK;
1001 :
1002 34 : LBASSERT( node->isClosed( ));
1003 34 : LBDEBUG << "Connecting " << node << std::endl;
1004 :
1005 : // try connecting using the given descriptions
1006 68 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
1007 102 : for( ConnectionDescriptionsCIter i = cds.begin();
1008 68 : i != cds.end(); ++i )
1009 : {
1010 34 : ConnectionDescriptionPtr description = *i;
1011 34 : if( description->type >= CONNECTIONTYPE_MULTICAST )
1012 0 : continue; // Don't use multicast for primary connections
1013 :
1014 34 : ConnectionPtr connection = Connection::create( description );
1015 34 : if( !connection || !connection->connect( ))
1016 0 : continue;
1017 :
1018 34 : return _connect( node, connection );
1019 : }
1020 :
1021 0 : LBDEBUG << "Node " << node
1022 0 : << " unreachable, all connections failed to connect" <<std::endl;
1023 0 : return CONNECT_UNREACHABLE;
1024 : }
1025 :
1026 0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
1027 : {
1028 0 : return ( _connect( node, connection ) == CONNECT_OK );
1029 : }
1030 :
1031 34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
1032 : {
1033 34 : LBASSERT( connection );
1034 34 : LBASSERT( node->getNodeID() != getNodeID( ));
1035 :
1036 68 : if( !node || !isListening() || !connection->isConnected() ||
1037 34 : !node->isClosed( ))
1038 : {
1039 0 : return CONNECT_BAD_STATE;
1040 : }
1041 :
1042 34 : _addConnection( connection );
1043 :
1044 : // send connect command to peer
1045 68 : lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
1046 : #ifdef COLLAGE_BIGENDIAN
1047 : uint32_t cmd = CMD_NODE_CONNECT_BE;
1048 : lunchbox::byteswap( cmd );
1049 : #else
1050 34 : const uint32_t cmd = CMD_NODE_CONNECT;
1051 : #endif
1052 68 : OCommand( Connections( 1, connection ), cmd )
1053 102 : << getNodeID() << request << getType() << serialize();
1054 :
1055 34 : bool connected = false;
1056 : try
1057 : {
1058 34 : connected = request.wait( 10000 /*ms*/ );
1059 : }
1060 0 : catch( const lunchbox::FutureTimeout& )
1061 : {
1062 0 : LBWARN << "Node connection handshake timeout - " << node
1063 0 : << " not a Collage node?" << std::endl;
1064 0 : request.unregister();
1065 0 : return CONNECT_TIMEOUT;
1066 : }
1067 :
1068 : // In simultaneous connect case, depending on the connection type
1069 : // (e.g. RDMA), a check on the connection state of the node is required
1070 34 : if( !connected || !node->isConnected( ))
1071 0 : return CONNECT_TRY_AGAIN;
1072 :
1073 34 : LBASSERT( node->getNodeID() != 0 );
1074 34 : LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
1075 34 : LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
1076 34 : return CONNECT_OK;
1077 : }
1078 :
1079 40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
1080 : {
1081 40 : LBASSERTINFO( id.isUUID(), id );
1082 40 : if( !id.isUUID( ))
1083 : {
1084 0 : LBWARN << "Invalid object id " << id << std::endl;
1085 0 : return 0;
1086 : }
1087 :
1088 40 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
1089 40 : if( masterNodeID == 0 )
1090 : {
1091 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1092 0 : return 0;
1093 : }
1094 :
1095 80 : NodePtr master = connect( masterNodeID );
1096 40 : if( master && !master->isClosed( ))
1097 40 : return master;
1098 :
1099 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1100 0 : << " for object " << id << std::endl;
1101 0 : return 0;
1102 : }
1103 :
1104 0 : bool LocalNode::launch( NodePtr node, const std::string& command )
1105 : {
1106 0 : size_t lastPos = 0;
1107 0 : std::string cmd;
1108 :
1109 0 : const std::string& quote = node->getLaunchQuote();
1110 : std::string launchOptions =
1111 0 : std::string( " --co-launch " ) + quote + node->serialize() +
1112 0 : node->getWorkDir() + CO_SEPARATOR + std::to_string( getType( )) +
1113 0 : CO_SEPARATOR + serialize() + quote + " --co-globals " + quote +
1114 0 : co::Global::toString() + quote;
1115 :
1116 0 : for( size_t percentPos = command.find( '%' );
1117 0 : percentPos != std::string::npos;
1118 0 : percentPos = command.find( '%', percentPos + 1 ))
1119 : {
1120 0 : std::string replacement;
1121 0 : switch( command[percentPos+1] )
1122 : {
1123 : case 'h':
1124 0 : replacement = node->getHostname();
1125 0 : if( replacement.empty( ))
1126 0 : replacement = "127.0.0.1";
1127 0 : break;
1128 :
1129 : case 'n':
1130 0 : replacement = node->getNodeID().getString();
1131 0 : break;
1132 :
1133 : case 'd':
1134 0 : replacement = node->getWorkDir();
1135 0 : break;
1136 :
1137 : case 'q':
1138 0 : replacement = quote;
1139 0 : break;
1140 :
1141 : case 'o':
1142 0 : replacement = launchOptions;
1143 0 : launchOptions.clear();
1144 0 : break;
1145 :
1146 : default:
1147 0 : LBWARN << "Unknown token " << command[percentPos+1] << std::endl;
1148 0 : replacement = command.substr( percentPos, percentPos+1 );
1149 : }
1150 :
1151 0 : cmd += command.substr( lastPos, percentPos - lastPos ) + replacement;
1152 0 : lastPos = percentPos + 2;
1153 : }
1154 :
1155 0 : cmd += command.substr( lastPos ) + launchOptions;
1156 :
1157 0 : LBDEBUG << "Launching: " << cmd << std::endl;
1158 0 : if( lunchbox::Launcher::run( cmd ))
1159 0 : return true;
1160 :
1161 0 : LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
1162 0 : return false;
1163 : }
1164 :
1165 0 : NodePtr LocalNode::syncLaunch( const uint128_t& nodeID, const int64_t timeout )
1166 : {
1167 0 : const lunchbox::Clock clock;
1168 :
1169 0 : do
1170 : {
1171 0 : co::NodePtr node = getNode( nodeID );
1172 0 : if( node && node->isConnected( ))
1173 0 : return node;
1174 :
1175 0 : lunchbox::sleep( 100 /*ms*/ );
1176 : }
1177 0 : while( timeout == static_cast< int32_t >( LB_TIMEOUT_INDEFINITE ) ||
1178 0 : clock.getTime64() < timeout );
1179 :
1180 0 : return nullptr;
1181 : }
1182 :
1183 0 : bool LocalNode::_setupPeer( const std::string& setupOpts )
1184 : {
1185 0 : size_t nextPos = setupOpts.find( CO_SEPARATOR );
1186 0 : if( nextPos == std::string::npos )
1187 : {
1188 0 : LBERROR << "Could not parse working directory: " << setupOpts
1189 0 : << std::endl;
1190 0 : return false;
1191 : }
1192 :
1193 0 : const std::string workDir = setupOpts.substr( 0, nextPos );
1194 0 : std::string description = setupOpts.substr( nextPos + 1 );
1195 :
1196 0 : if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
1197 0 : LBWARN << "Can't change working directory to " << workDir << ": "
1198 0 : << lunchbox::sysError << std::endl;
1199 :
1200 0 : nextPos = description.find( CO_SEPARATOR );
1201 0 : if( nextPos == std::string::npos )
1202 : {
1203 0 : LBERROR << "Could not parse server node type: " << description
1204 0 : << " is left from " << setupOpts << std::endl;
1205 0 : return false;
1206 : }
1207 :
1208 0 : const std::string nodeType = description.substr( 0, nextPos );
1209 0 : description = description.substr( nextPos + 1 );
1210 :
1211 0 : co::NodePtr peer = createNode( std::stoi( nodeType ));
1212 0 : if( !peer->deserialize( description ))
1213 0 : LBWARN << "Can't parse peer data" << std::endl;
1214 :
1215 0 : LBASSERTINFO( description.empty(), description );
1216 0 : if( !connect( peer ))
1217 : {
1218 0 : LBERROR << "Can't connect peer node using " << *peer << std::endl;
1219 0 : return false;
1220 : }
1221 0 : return true;
1222 : }
1223 :
1224 34 : NodePtr LocalNode::createNode( const uint32_t type )
1225 : {
1226 34 : LBASSERTINFO( type == NODETYPE_NODE, type );
1227 34 : return new Node( type );
1228 : }
1229 :
1230 0 : NodePtr LocalNode::getNode( const NodeID& id ) const
1231 : {
1232 0 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1233 0 : NodeHash::const_iterator i = _impl->nodes->find( id );
1234 0 : if( i == _impl->nodes->end() || !i->second->isReachable( ))
1235 0 : return 0;
1236 0 : return i->second;
1237 : }
1238 :
1239 114 : Nodes LocalNode::getNodes( const bool addSelf ) const
1240 : {
1241 114 : Nodes nodes;
1242 228 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1243 334 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1244 : {
1245 440 : NodePtr node = i->second;
1246 220 : if( node->isReachable() && ( addSelf || node != this ))
1247 220 : nodes.push_back( i->second );
1248 : }
1249 228 : return nodes;
1250 : }
1251 :
1252 146 : CommandQueue* LocalNode::getCommandThreadQueue()
1253 : {
1254 146 : return _impl->commandThread->getWorkerQueue();
1255 : }
1256 :
1257 8 : bool LocalNode::inCommandThread() const
1258 : {
1259 8 : return _impl->commandThread->isCurrent();
1260 : }
1261 :
1262 0 : const Strings& LocalNode::getCommandLine() const
1263 : {
1264 0 : return _impl->args;
1265 : }
1266 :
1267 72429 : int64_t LocalNode::getTime64() const
1268 : {
1269 72429 : return _impl->clock.getTime64();
1270 : }
1271 :
1272 0 : ssize_t LocalNode::getCounter( const Counter counter ) const
1273 : {
1274 0 : return _impl->counters[ counter ];
1275 : }
1276 :
1277 196 : void LocalNode::flushCommands()
1278 : {
1279 196 : _impl->incoming.interrupt();
1280 196 : }
1281 :
1282 : //----------------------------------------------------------------------
1283 : // receiver thread functions
1284 : //----------------------------------------------------------------------
1285 50 : void LocalNode::_runReceiverThread()
1286 : {
1287 50 : LB_TS_THREAD( _rcvThread );
1288 50 : _initService();
1289 :
1290 50 : int nErrors = 0;
1291 145152 : while( isListening( ))
1292 : {
1293 72552 : const ConnectionSet::Event result = _impl->incoming.select();
1294 72551 : switch( result )
1295 : {
1296 : case ConnectionSet::EVENT_CONNECT:
1297 34 : _handleConnect();
1298 34 : break;
1299 :
1300 : case ConnectionSet::EVENT_DATA:
1301 72179 : nErrors = 0;
1302 72179 : _handleData();
1303 72179 : break;
1304 :
1305 : case ConnectionSet::EVENT_DISCONNECT:
1306 : case ConnectionSet::EVENT_INVALID_HANDLE:
1307 34 : _handleDisconnect();
1308 34 : break;
1309 :
1310 : case ConnectionSet::EVENT_TIMEOUT:
1311 0 : LBINFO << "select timeout" << std::endl;
1312 0 : break;
1313 :
1314 : case ConnectionSet::EVENT_ERROR:
1315 0 : ++nErrors;
1316 0 : LBWARN << "Connection error during select" << std::endl;
1317 0 : if( nErrors > 100 )
1318 : {
1319 0 : LBWARN << "Too many errors in a row, capping connection"
1320 0 : << std::endl;
1321 0 : _handleDisconnect();
1322 : }
1323 0 : break;
1324 :
1325 : case ConnectionSet::EVENT_SELECT_ERROR:
1326 0 : LBWARN << "Error during select" << std::endl;
1327 0 : ++nErrors;
1328 0 : if( nErrors > 10 )
1329 : {
1330 0 : LBWARN << "Too many errors in a row" << std::endl;
1331 0 : LBUNIMPLEMENTED;
1332 : }
1333 0 : break;
1334 :
1335 : case ConnectionSet::EVENT_INTERRUPT:
1336 304 : _redispatchCommands();
1337 304 : break;
1338 :
1339 : default:
1340 0 : LBUNIMPLEMENTED;
1341 : }
1342 : }
1343 :
1344 49 : if( !_impl->pendingCommands.empty( ))
1345 0 : LBWARN << _impl->pendingCommands.size()
1346 0 : << " commands pending while leaving command thread" << std::endl;
1347 :
1348 49 : _impl->pendingCommands.clear();
1349 49 : LBCHECK( _impl->commandThread->join( ));
1350 :
1351 98 : ConnectionPtr connection = getConnection();
1352 98 : PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
1353 49 : connection = pipe->getSibling();
1354 49 : _removeConnection( connection );
1355 49 : _impl->connectionNodes.erase( connection );
1356 49 : _disconnect();
1357 :
1358 49 : const Connections& connections = _impl->incoming.getConnections();
1359 195 : while( !connections.empty( ))
1360 : {
1361 73 : connection = connections.back();
1362 146 : NodePtr node = _impl->connectionNodes[ connection ];
1363 :
1364 73 : if( node )
1365 73 : _closeNode( node );
1366 73 : _removeConnection( connection );
1367 : }
1368 :
1369 49 : _impl->objectStore->clear();
1370 49 : _impl->pendingCommands.clear();
1371 49 : _impl->smallBuffers.flush();
1372 49 : _impl->bigBuffers.flush();
1373 :
1374 245 : LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
1375 196 : << std::endl;
1376 49 : }
1377 :
1378 34 : void LocalNode::_handleConnect()
1379 : {
1380 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1381 68 : ConnectionPtr newConn = connection->acceptSync();
1382 34 : connection->acceptNB();
1383 :
1384 34 : if( newConn )
1385 34 : _addConnection( newConn );
1386 : else
1387 0 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1388 34 : }
1389 :
1390 34 : void LocalNode::_handleDisconnect()
1391 : {
1392 34 : while( _handleData( )) ; // read remaining data off connection
1393 :
1394 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1395 34 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
1396 :
1397 34 : if( i != _impl->connectionNodes.end( ))
1398 : {
1399 68 : NodePtr node = i->second;
1400 :
1401 34 : node->ref(); // extend lifetime to give cmd handler a chance
1402 :
1403 : // local command dispatching
1404 68 : OCommand( this, this, CMD_NODE_REMOVE_NODE )
1405 102 : << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
1406 :
1407 34 : if( node->getConnection() == connection )
1408 34 : _closeNode( node );
1409 0 : else if( connection->isMulticast( ))
1410 0 : node->_removeMulticast( connection );
1411 : }
1412 :
1413 34 : _removeConnection( connection );
1414 34 : }
1415 :
1416 72213 : bool LocalNode::_handleData()
1417 : {
1418 72213 : _impl->smallBuffers.compact();
1419 72213 : _impl->bigBuffers.compact();
1420 :
1421 144426 : ConnectionPtr connection = _impl->incoming.getConnection();
1422 72213 : LBASSERT( connection );
1423 :
1424 144426 : BufferPtr buffer = _readHead( connection );
1425 72213 : if( !buffer ) // fluke signal
1426 68 : return false;
1427 :
1428 144290 : ICommand command = _setupCommand( connection, buffer );
1429 72145 : const bool gotCommand = _readTail( command, buffer, connection );
1430 72145 : LBASSERT( gotCommand );
1431 :
1432 : // start next receive
1433 144290 : BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1434 72145 : connection->recvNB( nextBuffer, COMMAND_MINSIZE );
1435 :
1436 72145 : if( gotCommand )
1437 : {
1438 72145 : _dispatchCommand( command );
1439 72145 : return true;
1440 : }
1441 :
1442 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1443 0 : return false;
1444 : }
1445 :
1446 72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
1447 : {
1448 144426 : BufferPtr buffer;
1449 72213 : const bool gotSize = connection->recvSync( buffer, false );
1450 :
1451 72213 : if( !buffer ) // fluke signal
1452 : {
1453 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1454 0 : << std::endl;
1455 0 : _impl->incoming.setDirty();
1456 0 : return 0;
1457 : }
1458 :
1459 72213 : if( gotSize )
1460 72145 : return buffer;
1461 :
1462 : // Some systems signal data on dead connections.
1463 68 : buffer->setSize( 0 );
1464 68 : connection->recvNB( buffer, COMMAND_MINSIZE );
1465 68 : return 0;
1466 : }
1467 :
1468 72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
1469 : ConstBufferPtr buffer )
1470 : {
1471 144290 : NodePtr node;
1472 72145 : ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
1473 72145 : if( i != _impl->connectionNodes.end( ))
1474 72077 : node = i->second;
1475 72145 : LBVERB << "Handle data from " << node << std::endl;
1476 :
1477 : #ifdef COLLAGE_BIGENDIAN
1478 : const bool swapping = node ? !node->isBigEndian() : false;
1479 : #else
1480 72145 : const bool swapping = node ? node->isBigEndian() : false;
1481 : #endif
1482 144290 : ICommand command( this, node, buffer, swapping );
1483 :
1484 72145 : if( node )
1485 : {
1486 72077 : node->_setLastReceive( getTime64( ));
1487 72077 : return command;
1488 : }
1489 :
1490 68 : uint32_t cmd = command.getCommand();
1491 : #ifdef COLLAGE_BIGENDIAN
1492 : lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
1493 : #endif
1494 68 : switch( cmd )
1495 : {
1496 : case CMD_NODE_CONNECT:
1497 : case CMD_NODE_CONNECT_REPLY:
1498 : case CMD_NODE_ID:
1499 : #ifdef COLLAGE_BIGENDIAN
1500 : command = ICommand( this, node, buffer, true );
1501 : #endif
1502 68 : break;
1503 :
1504 : case CMD_NODE_CONNECT_BE:
1505 : case CMD_NODE_CONNECT_REPLY_BE:
1506 : case CMD_NODE_ID_BE:
1507 : #ifndef COLLAGE_BIGENDIAN
1508 0 : command = ICommand( this, node, buffer, true );
1509 : #endif
1510 0 : break;
1511 :
1512 : default:
1513 0 : LBUNIMPLEMENTED;
1514 0 : return ICommand();
1515 : }
1516 :
1517 68 : command.setCommand( cmd ); // reset correctly swapped version
1518 68 : return command;
1519 : }
1520 :
1521 72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
1522 : ConnectionPtr connection )
1523 : {
1524 72145 : const uint64_t needed = command.getSize();
1525 72145 : if( needed <= buffer->getSize( ))
1526 62134 : return true;
1527 :
1528 10011 : if( needed > buffer->getMaxSize( ))
1529 : {
1530 0 : LBASSERT( needed > COMMAND_ALLOCSIZE );
1531 0 : LBASSERTINFO( needed < LB_BIT48,
1532 : "Out-of-sync network stream: " << command << "?" );
1533 : // not enough space for remaining data, alloc and copy to new buffer
1534 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
1535 0 : newBuffer->replace( *buffer );
1536 0 : buffer = newBuffer;
1537 :
1538 0 : command = ICommand( this, command.getRemoteNode(), buffer,
1539 0 : command.isSwapping( ));
1540 : }
1541 :
1542 : // read remaining data
1543 10011 : connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
1544 10011 : return connection->recvSync( buffer );
1545 : }
1546 :
1547 35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
1548 : {
1549 35 : LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
1550 : BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
1551 0 : _impl->bigBuffers.alloc( size ) :
1552 35 : _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1553 35 : return buffer;
1554 : }
1555 :
1556 72194 : void LocalNode::_dispatchCommand( ICommand& command )
1557 : {
1558 72194 : LBASSERTINFO( command.isValid(), command );
1559 :
1560 72194 : if( dispatchCommand( command ))
1561 72194 : _redispatchCommands();
1562 : else
1563 : {
1564 0 : _redispatchCommands();
1565 0 : _impl->pendingCommands.push_back( command );
1566 : }
1567 72194 : }
1568 :
1569 72231 : bool LocalNode::dispatchCommand( ICommand& command )
1570 : {
1571 72231 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1572 72231 : LBASSERTINFO( command.isValid(), command );
1573 :
1574 72231 : const uint32_t type = command.getType();
1575 72231 : switch( type )
1576 : {
1577 : case COMMANDTYPE_NODE:
1578 71726 : LBCHECK( Dispatcher::dispatchCommand( command ));
1579 71726 : return true;
1580 :
1581 : case COMMANDTYPE_OBJECT:
1582 505 : return _impl->objectStore->dispatchObjectCommand( command );
1583 :
1584 : default:
1585 0 : LBABORT( "Unknown command type " << type << " for " << command );
1586 0 : return true;
1587 : }
1588 : }
1589 :
1590 72498 : void LocalNode::_redispatchCommands()
1591 : {
1592 72498 : bool changes = true;
1593 72498 : while( changes && !_impl->pendingCommands.empty( ))
1594 : {
1595 0 : changes = false;
1596 :
1597 0 : for( CommandList::iterator i = _impl->pendingCommands.begin();
1598 0 : i != _impl->pendingCommands.end(); ++i )
1599 : {
1600 0 : ICommand& command = *i;
1601 0 : LBASSERT( command.isValid( ));
1602 :
1603 0 : if( dispatchCommand( command ))
1604 : {
1605 0 : _impl->pendingCommands.erase( i );
1606 0 : changes = true;
1607 0 : break;
1608 : }
1609 : }
1610 : }
1611 :
1612 : #ifndef NDEBUG
1613 72498 : if( !_impl->pendingCommands.empty( ))
1614 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1615 0 : << std::endl;
1616 72498 : LBASSERT( _impl->pendingCommands.size() < 200 );
1617 : #endif
1618 72498 : }
1619 :
1620 50 : void LocalNode::_initService()
1621 : {
1622 97 : LB_TS_SCOPED( _rcvThread );
1623 50 : _impl->service->withdraw(); // go silent during k/v update
1624 :
1625 97 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1626 50 : if( descs.empty( ))
1627 3 : return;
1628 :
1629 94 : std::ostringstream out;
1630 47 : out << getType();
1631 47 : _impl->service->set( "co_type", out.str( ));
1632 :
1633 47 : out.str("");
1634 47 : out << descs.size();
1635 47 : _impl->service->set( "co_numPorts", out.str( ));
1636 :
1637 94 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
1638 : {
1639 94 : ConnectionDescriptionPtr desc = *i;
1640 47 : out.str("");
1641 47 : out << "co_port" << i - descs.begin();
1642 47 : _impl->service->set( out.str(), desc->toString( ));
1643 : }
1644 :
1645 47 : _impl->service->announce( descs.front()->port, getNodeID().getString( ));
1646 : }
1647 :
1648 49 : void LocalNode::_exitService()
1649 : {
1650 49 : _impl->service->withdraw();
1651 49 : }
1652 :
1653 1 : Zeroconf LocalNode::getZeroconf()
1654 : {
1655 2 : lunchbox::ScopedWrite mutex( _impl->service );
1656 1 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
1657 2 : return Zeroconf( _impl->service.data );
1658 : }
1659 :
1660 :
1661 : //----------------------------------------------------------------------
1662 : // command thread functions
1663 : //----------------------------------------------------------------------
1664 50 : bool LocalNode::_startCommandThread( const int32_t threadID )
1665 : {
1666 50 : _impl->commandThread->threadID = threadID;
1667 50 : return _impl->commandThread->start();
1668 : }
1669 :
1670 51574 : bool LocalNode::_notifyCommandThreadIdle()
1671 : {
1672 51574 : return _impl->objectStore->notifyCommandThreadIdle();
1673 : }
1674 :
1675 20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
1676 : {
1677 20001 : const uint32_t requestID = command.get< uint32_t >();
1678 20001 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
1679 :
1680 20001 : serveRequest( requestID );
1681 20001 : return true;
1682 : }
1683 :
1684 49 : bool LocalNode::_cmdStopRcv( ICommand& command )
1685 : {
1686 49 : LB_TS_THREAD( _rcvThread );
1687 49 : LBASSERT( isListening( ));
1688 :
1689 49 : _exitService();
1690 49 : _setClosing(); // causes rcv thread exit
1691 :
1692 49 : command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
1693 49 : _dispatchCommand( command );
1694 49 : return true;
1695 : }
1696 :
1697 49 : bool LocalNode::_cmdStopCmd( ICommand& )
1698 : {
1699 49 : LB_TS_THREAD( _cmdThread );
1700 49 : LBASSERTINFO( isClosing(), *this );
1701 :
1702 49 : _setClosed();
1703 49 : return true;
1704 : }
1705 :
1706 0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
1707 : {
1708 0 : const int32_t affinity = command.get< int32_t >();
1709 :
1710 0 : lunchbox::Thread::setAffinity( affinity );
1711 0 : return true;
1712 : }
1713 :
1714 34 : bool LocalNode::_cmdConnect( ICommand& command )
1715 : {
1716 34 : LBASSERTINFO( !command.getRemoteNode(), command );
1717 34 : LBASSERT( _impl->inReceiverThread( ));
1718 :
1719 34 : const NodeID& nodeID = command.get< NodeID >();
1720 34 : const uint32_t requestID = command.get< uint32_t >();
1721 34 : const uint32_t nodeType = command.get< uint32_t >();
1722 68 : std::string data = command.get< std::string >();
1723 :
1724 34 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1725 34 : << nodeType << " data " << data << std::endl;
1726 :
1727 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1728 :
1729 34 : LBASSERT( connection );
1730 34 : LBASSERT( nodeID != getNodeID() );
1731 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1732 : _impl->connectionNodes.end( ));
1733 :
1734 68 : NodePtr peer;
1735 : #ifdef COLLAGE_BIGENDIAN
1736 : uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
1737 : lunchbox::byteswap( cmd );
1738 : #else
1739 34 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1740 : #endif
1741 :
1742 : // No locking needed, only recv thread modifies
1743 34 : NodeHashCIter i = _impl->nodes->find( nodeID );
1744 34 : if( i != _impl->nodes->end( ))
1745 : {
1746 0 : peer = i->second;
1747 0 : if( peer->isReachable( ))
1748 : {
1749 : // Node exists, probably simultaneous connect from peer
1750 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1751 0 : << std::endl;
1752 :
1753 : // refuse connection
1754 0 : OCommand( Connections( 1, connection ), cmd )
1755 0 : << NodeID() << requestID;
1756 :
1757 : // NOTE: There is no close() here. The reply command above has to be
1758 : // received by the peer first, before closing the connection.
1759 0 : _removeConnection( connection );
1760 0 : return true;
1761 : }
1762 : }
1763 :
1764 : // create and add connected node
1765 34 : if( !peer )
1766 34 : peer = createNode( nodeType );
1767 34 : if( !peer )
1768 : {
1769 0 : LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
1770 0 : << std::endl;
1771 :
1772 : // refuse connection
1773 0 : OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
1774 :
1775 : // NOTE: There is no close() here. The reply command above has to be
1776 : // received by the peer first, before closing the connection.
1777 0 : _removeConnection( connection );
1778 0 : return true;
1779 : }
1780 :
1781 34 : if( !peer->deserialize( data ))
1782 0 : LBWARN << "Error during node initialization" << std::endl;
1783 34 : LBASSERTINFO( data.empty(), data );
1784 34 : LBASSERTINFO( peer->getNodeID() == nodeID,
1785 : peer->getNodeID() << "!=" << nodeID );
1786 34 : LBASSERT( peer->getType() == nodeType );
1787 :
1788 34 : _impl->connectionNodes[ connection ] = peer;
1789 : {
1790 68 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1791 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1792 : }
1793 34 : LBVERB << "Added node " << nodeID << std::endl;
1794 :
1795 : // send our information as reply
1796 68 : OCommand( Connections( 1, connection ), cmd )
1797 102 : << getNodeID() << requestID << getType() << serialize();
1798 :
1799 34 : return true;
1800 : }
1801 :
1802 34 : bool LocalNode::_cmdConnectReply( ICommand& command )
1803 : {
1804 34 : LBASSERT( !command.getRemoteNode( ));
1805 34 : LBASSERT( _impl->inReceiverThread( ));
1806 :
1807 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1808 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1809 : _impl->connectionNodes.end( ));
1810 :
1811 34 : const NodeID& nodeID = command.get< NodeID >();
1812 34 : const uint32_t requestID = command.get< uint32_t >();
1813 :
1814 : // connection refused
1815 34 : if( nodeID == 0 )
1816 : {
1817 0 : LBINFO << "Connection refused, node already connected by peer"
1818 0 : << std::endl;
1819 :
1820 0 : _removeConnection( connection );
1821 0 : serveRequest( requestID, false );
1822 0 : return true;
1823 : }
1824 :
1825 34 : const uint32_t nodeType = command.get< uint32_t >();
1826 68 : std::string data = command.get< std::string >();
1827 :
1828 34 : LBVERB << "handle connect reply " << command << " req " << requestID
1829 34 : << " type " << nodeType << " data " << data << std::endl;
1830 :
1831 : // No locking needed, only recv thread modifies
1832 34 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1833 68 : NodePtr peer;
1834 34 : if( i != _impl->nodes->end( ))
1835 0 : peer = i->second;
1836 :
1837 34 : if( peer && peer->isReachable( )) // simultaneous connect
1838 : {
1839 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1840 0 : << connection << std::endl;
1841 :
1842 0 : _removeConnection( connection );
1843 0 : _closeNode( peer );
1844 0 : serveRequest( requestID, false );
1845 0 : return true;
1846 : }
1847 :
1848 : // create and add node
1849 34 : if( !peer )
1850 : {
1851 34 : if( requestID != LB_UNDEFINED_UINT32 )
1852 34 : peer = reinterpret_cast< Node* >( getRequestData( requestID ));
1853 : else
1854 0 : peer = createNode( nodeType );
1855 : }
1856 34 : if( !peer )
1857 : {
1858 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1859 0 : << std::endl;
1860 0 : _removeConnection( connection );
1861 0 : return true;
1862 : }
1863 :
1864 34 : LBASSERTINFO( peer->getType() == nodeType,
1865 : peer->getType() << " != " << nodeType );
1866 34 : LBASSERT( peer->isClosed( ));
1867 :
1868 34 : if( !peer->deserialize( data ))
1869 0 : LBWARN << "Error during node initialization" << std::endl;
1870 34 : LBASSERT( data.empty( ));
1871 34 : LBASSERT( peer->getNodeID() == nodeID );
1872 :
1873 : // send ACK to peer
1874 : // cppcheck-suppress unusedScopedObject
1875 34 : OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
1876 :
1877 34 : peer->_connect( connection );
1878 34 : _impl->connectionNodes[ connection ] = peer;
1879 : {
1880 68 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1881 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1882 : }
1883 34 : _connectMulticast( peer );
1884 34 : LBVERB << "Added node " << nodeID << std::endl;
1885 :
1886 34 : serveRequest( requestID, true );
1887 :
1888 34 : notifyConnect( peer );
1889 34 : return true;
1890 : }
1891 :
1892 34 : bool LocalNode::_cmdConnectAck( ICommand& command )
1893 : {
1894 68 : NodePtr node = command.getRemoteNode();
1895 34 : LBASSERT( node );
1896 34 : LBASSERT( _impl->inReceiverThread( ));
1897 34 : LBVERB << "handle connect ack" << std::endl;
1898 :
1899 34 : node->_connect( _impl->incoming.getConnection( ));
1900 34 : _connectMulticast( node );
1901 34 : notifyConnect( node );
1902 68 : return true;
1903 : }
1904 :
1905 0 : bool LocalNode::_cmdID( ICommand& command )
1906 : {
1907 0 : LBASSERT( _impl->inReceiverThread( ));
1908 :
1909 0 : const NodeID& nodeID = command.get< NodeID >();
1910 0 : uint32_t nodeType = command.get< uint32_t >();
1911 0 : std::string data = command.get< std::string >();
1912 :
1913 0 : if( command.getRemoteNode( ))
1914 : {
1915 0 : LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
1916 0 : LBASSERT( command.getRemoteNode()->_getMulticast( ));
1917 0 : return true;
1918 : }
1919 :
1920 0 : LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
1921 :
1922 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1923 0 : LBASSERT( connection->isMulticast( ));
1924 0 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1925 : _impl->connectionNodes.end( ));
1926 :
1927 0 : NodePtr node;
1928 0 : if( nodeID == getNodeID() ) // 'self' multicast connection
1929 0 : node = this;
1930 : else
1931 : {
1932 : // No locking needed, only recv thread writes
1933 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1934 :
1935 0 : if( i == _impl->nodes->end( ))
1936 : {
1937 : // unknown node: create and add unconnected node
1938 0 : node = createNode( nodeType );
1939 :
1940 0 : if( !node->deserialize( data ))
1941 0 : LBWARN << "Error during node initialization" << std::endl;
1942 0 : LBASSERTINFO( data.empty(), data );
1943 :
1944 : {
1945 0 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1946 0 : _impl->nodes.data[ nodeID ] = node;
1947 : }
1948 0 : LBVERB << "Added node " << nodeID << " with multicast "
1949 0 : << connection << std::endl;
1950 : }
1951 : else
1952 0 : node = i->second;
1953 : }
1954 0 : LBASSERT( node );
1955 0 : LBASSERTINFO( node->getNodeID() == nodeID,
1956 : node->getNodeID() << "!=" << nodeID );
1957 :
1958 0 : _connectMulticast( node, connection );
1959 0 : _impl->connectionNodes[ connection ] = node;
1960 0 : LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
1961 0 : << " to " << getNodeID() << std::endl;
1962 0 : return true;
1963 : }
1964 :
1965 8 : bool LocalNode::_cmdDisconnect( ICommand& command )
1966 : {
1967 8 : LBASSERT( _impl->inReceiverThread( ));
1968 :
1969 8 : const uint32_t requestID = command.get< uint32_t >();
1970 :
1971 16 : NodePtr node = static_cast<Node*>( getRequestData( requestID ));
1972 8 : LBASSERT( node );
1973 :
1974 8 : _closeNode( node );
1975 8 : LBASSERT( node->isClosed( ));
1976 8 : serveRequest( requestID );
1977 16 : return true;
1978 : }
1979 :
1980 0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
1981 : {
1982 0 : const NodeID& nodeID = command.get< NodeID >();
1983 0 : const uint32_t requestID = command.get< uint32_t >();
1984 :
1985 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1986 0 : << " nodeID " << nodeID << std::endl;
1987 :
1988 0 : NodePtr node = getNode( nodeID );
1989 0 : NodePtr toNode = command.getRemoteNode();
1990 :
1991 0 : uint32_t nodeType = NODETYPE_INVALID;
1992 0 : std::string nodeData;
1993 0 : if( node )
1994 : {
1995 0 : nodeType = node->getType();
1996 0 : nodeData = node->serialize();
1997 0 : LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
1998 0 : << toNode << std::endl;
1999 : }
2000 :
2001 0 : toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
2002 0 : << nodeID << requestID << nodeType << nodeData;
2003 0 : return true;
2004 : }
2005 :
2006 0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
2007 : {
2008 0 : LBASSERT( _impl->inReceiverThread( ));
2009 :
2010 0 : const NodeID& nodeID = command.get< NodeID >();
2011 0 : const uint32_t requestID = command.get< uint32_t >();
2012 0 : const uint32_t nodeType = command.get< uint32_t >();
2013 0 : std::string nodeData = command.get< std::string >();
2014 :
2015 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
2016 0 : << " type " << nodeType << " data " << nodeData << std::endl;
2017 :
2018 : // No locking needed, only recv thread writes
2019 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
2020 0 : if( i != _impl->nodes->end( ))
2021 : {
2022 : // Requested node connected to us in the meantime
2023 0 : NodePtr node = i->second;
2024 :
2025 0 : node->ref( this );
2026 0 : serveRequest( requestID, node.get( ));
2027 0 : return true;
2028 : }
2029 :
2030 0 : if( nodeType == NODETYPE_INVALID )
2031 : {
2032 0 : serveRequest( requestID, (void*)0 );
2033 0 : return true;
2034 : }
2035 :
2036 : // new node: create and add unconnected node
2037 0 : NodePtr node = createNode( nodeType );
2038 0 : if( node )
2039 : {
2040 0 : LBASSERT( node );
2041 :
2042 0 : if( !node->deserialize( nodeData ))
2043 0 : LBWARN << "Failed to initialize node data" << std::endl;
2044 0 : LBASSERT( nodeData.empty( ));
2045 0 : node->ref( this );
2046 : }
2047 : else
2048 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
2049 :
2050 0 : serveRequest( requestID, node.get( ));
2051 0 : return true;
2052 : }
2053 :
2054 0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
2055 : {
2056 0 : LBASSERT( inCommandThread( ));
2057 0 : if( !_impl->sendToken ) // enqueue command if no token available
2058 : {
2059 0 : const uint32_t timeout = Global::getTimeout();
2060 0 : if( timeout == LB_TIMEOUT_INDEFINITE ||
2061 0 : ( getTime64() - _impl->lastSendToken <= timeout ))
2062 : {
2063 0 : _impl->sendTokenQueue.push_back( command );
2064 0 : return true;
2065 : }
2066 :
2067 : // timeout! - clear old requests
2068 0 : _impl->sendTokenQueue.clear();
2069 : // 'generate' new token - release is robust
2070 : }
2071 :
2072 0 : _impl->sendToken = false;
2073 :
2074 0 : const uint32_t requestID = command.get< uint32_t >();
2075 0 : command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
2076 0 : << requestID;
2077 0 : return true;
2078 : }
2079 :
2080 0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
2081 : {
2082 0 : const uint32_t requestID = command.get< uint32_t >();
2083 0 : serveRequest( requestID );
2084 0 : return true;
2085 : }
2086 :
2087 0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
2088 : {
2089 0 : LBASSERT( inCommandThread( ));
2090 0 : _impl->lastSendToken = getTime64();
2091 :
2092 0 : if( _impl->sendToken )
2093 0 : return true; // double release due to timeout
2094 0 : if( _impl->sendTokenQueue.empty( ))
2095 : {
2096 0 : _impl->sendToken = true;
2097 0 : return true;
2098 : }
2099 :
2100 0 : ICommand& request = _impl->sendTokenQueue.front();
2101 :
2102 0 : const uint32_t requestID = request.get< uint32_t >();
2103 0 : request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
2104 0 : << requestID;
2105 0 : _impl->sendTokenQueue.pop_front();
2106 0 : return true;
2107 : }
2108 :
2109 0 : bool LocalNode::_cmdAddListener( ICommand& command )
2110 : {
2111 : Connection* rawConnection =
2112 0 : reinterpret_cast< Connection* >( command.get< uint64_t >( ));
2113 0 : std::string data = command.get< std::string >();
2114 :
2115 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2116 0 : command.getRemoteNode()->_addConnectionDescription( description );
2117 :
2118 0 : if( command.getRemoteNode() != this )
2119 0 : return true;
2120 :
2121 0 : ConnectionPtr connection = rawConnection;
2122 0 : connection->unref();
2123 0 : LBASSERT( connection );
2124 :
2125 0 : _impl->connectionNodes[ connection ] = this;
2126 0 : if( connection->isMulticast( ))
2127 0 : _addMulticast( this, connection );
2128 :
2129 0 : connection->acceptNB();
2130 0 : _impl->incoming.addConnection( connection );
2131 :
2132 0 : _initService(); // update zeroconf
2133 0 : return true;
2134 : }
2135 :
2136 0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
2137 : {
2138 0 : const uint32_t requestID = command.get< uint32_t >();
2139 0 : Connection* rawConnection = command.get< Connection* >();
2140 0 : std::string data = command.get< std::string >();
2141 :
2142 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2143 0 : LBCHECK(
2144 : command.getRemoteNode()->_removeConnectionDescription( description ));
2145 :
2146 0 : if( command.getRemoteNode() != this )
2147 0 : return true;
2148 :
2149 0 : _initService(); // update zeroconf
2150 :
2151 0 : ConnectionPtr connection = rawConnection;
2152 0 : connection->unref( this );
2153 0 : LBASSERT( connection );
2154 :
2155 0 : if( connection->isMulticast( ))
2156 0 : _removeMulticast( connection );
2157 :
2158 0 : _impl->incoming.removeConnection( connection );
2159 0 : LBASSERT( _impl->connectionNodes.find( connection ) !=
2160 : _impl->connectionNodes.end( ));
2161 0 : _impl->connectionNodes.erase( connection );
2162 0 : serveRequest( requestID );
2163 0 : return true;
2164 : }
2165 :
2166 0 : bool LocalNode::_cmdPing( ICommand& command )
2167 : {
2168 0 : LBASSERT( inCommandThread( ));
2169 0 : command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
2170 0 : return true;
2171 : }
2172 :
2173 2 : bool LocalNode::_cmdCommand( ICommand& command )
2174 : {
2175 2 : const uint128_t& commandID = command.get< uint128_t >();
2176 4 : CommandHandler func;
2177 : {
2178 3 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2179 2 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2180 2 : if( i == _impl->commandHandlers->end( ))
2181 0 : return false;
2182 :
2183 2 : CommandQueue* queue = i->second.second;
2184 2 : if( queue )
2185 : {
2186 2 : command.setDispatchFunction( CmdFunc( this,
2187 1 : &LocalNode::_cmdCommandAsync ));
2188 1 : queue->push( command );
2189 1 : return true;
2190 : }
2191 : // else
2192 :
2193 1 : func = i->second.first;
2194 : }
2195 :
2196 2 : CustomICommand customCmd( command );
2197 1 : return func( customCmd );
2198 : }
2199 :
2200 1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
2201 : {
2202 1 : const uint128_t& commandID = command.get< uint128_t >();
2203 2 : CommandHandler func;
2204 : {
2205 2 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2206 1 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2207 1 : LBASSERT( i != _impl->commandHandlers->end( ));
2208 1 : if( i == _impl->commandHandlers->end( ))
2209 0 : return true; // deregistered between dispatch and now
2210 1 : func = i->second.first;
2211 : }
2212 2 : CustomICommand customCmd( command );
2213 1 : return func( customCmd );
2214 : }
2215 :
2216 34 : bool LocalNode::_cmdAddConnection( ICommand& command )
2217 : {
2218 34 : LBASSERT( _impl->inReceiverThread( ));
2219 :
2220 68 : ConnectionPtr connection = command.get< ConnectionPtr >();
2221 34 : _addConnection( connection );
2222 34 : connection->unref(); // ref'd by _addConnection
2223 68 : return true;
2224 : }
2225 :
2226 66 : }
|