Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/futureFunction.h>
46 : #include <lunchbox/hash.h>
47 : #include <lunchbox/launcher.h>
48 : #include <lunchbox/lockable.h>
49 : #include <lunchbox/request.h>
50 : #include <lunchbox/rng.h>
51 : #include <lunchbox/servus.h>
52 : #include <lunchbox/sleep.h>
53 :
54 : #include <boost/bind.hpp>
55 : #include <boost/date_time/posix_time/posix_time.hpp>
56 : #include <boost/lexical_cast.hpp>
57 :
58 : #include <list>
59 : #ifdef _MSC_VER
60 : # include <direct.h> // for chdir
61 : # define chdir _chdir
62 : #endif
63 :
64 : namespace co
65 : {
66 : namespace
67 : {
68 22 : lunchbox::a_int32_t _threadIDs;
69 :
70 : typedef CommandFunc< LocalNode > CmdFunc;
71 : typedef std::list< ICommand > CommandList;
72 : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
73 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
74 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
75 : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
76 : typedef NodeHash::const_iterator NodeHashCIter;
77 : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
78 : typedef HandlerHash::const_iterator HandlerHashCIter;
79 : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
80 : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
81 : typedef CommandHash::const_iterator CommandHashCIter;
82 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
83 : }
84 :
85 : namespace detail
86 : {
87 106 : class ReceiverThread : public lunchbox::Thread
88 : {
89 : public:
90 55 : explicit ReceiverThread( co::LocalNode* localNode )
91 55 : : _localNode( localNode ) {}
92 :
93 50 : bool init() override
94 : {
95 50 : const int32_t threadID = ++_threadIDs - 1;
96 100 : setName( std::string( "Rcv" ) +
97 50 : boost::lexical_cast< std::string >( threadID ));
98 50 : return _localNode->_startCommandThread( threadID );
99 : }
100 :
101 50 : void run() override { _localNode->_runReceiverThread(); }
102 :
103 : private:
104 : co::LocalNode* const _localNode;
105 : };
106 :
107 106 : class CommandThread : public Worker
108 : {
109 : public:
110 55 : explicit CommandThread( co::LocalNode* localNode )
111 : : Worker( Global::getCommandQueueLimit( ))
112 : , threadID( 0 )
113 55 : , _localNode( localNode )
114 55 : {}
115 :
116 : int32_t threadID;
117 :
118 : protected:
119 50 : bool init() override
120 : {
121 100 : setName( std::string( "Cmd" ) +
122 50 : boost::lexical_cast< std::string >( threadID ));
123 50 : return true;
124 : }
125 :
126 103339 : bool stopRunning() override { return _localNode->isClosed(); }
127 51608 : bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
128 :
129 : private:
130 : co::LocalNode* const _localNode;
131 : };
132 :
133 : class LocalNode
134 : {
135 : public:
136 55 : LocalNode()
137 : : smallBuffers( 200 )
138 : , bigBuffers( 20 )
139 : , sendToken( true )
140 : , lastSendToken( 0 )
141 : , objectStore( 0 )
142 : , receiverThread( 0 )
143 : , commandThread( 0 )
144 55 : , service( "_collage._tcp" )
145 55 : {}
146 :
147 53 : ~LocalNode()
148 53 : {
149 53 : LBASSERT( incoming.isEmpty( ));
150 53 : LBASSERT( connectionNodes.empty( ));
151 53 : LBASSERT( pendingCommands.empty( ));
152 53 : LBASSERT( nodes->empty( ));
153 :
154 53 : delete objectStore;
155 53 : objectStore = 0;
156 53 : LBASSERT( !commandThread->isRunning( ));
157 53 : delete commandThread;
158 53 : commandThread = 0;
159 :
160 53 : LBASSERT( !receiverThread->isRunning( ));
161 53 : delete receiverThread;
162 53 : receiverThread = 0;
163 53 : }
164 :
165 280 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
166 :
167 : /** Commands re-scheduled for dispatch. */
168 : CommandList pendingCommands;
169 :
170 : /** The command buffer 'allocator' for small packets */
171 : co::BufferCache smallBuffers;
172 :
173 : /** The command buffer 'allocator' for big packets */
174 : co::BufferCache bigBuffers;
175 :
176 : bool sendToken; //!< send token availability.
177 : uint64_t lastSendToken; //!< last used time for timeout detection
178 : std::deque< co::ICommand > sendTokenQueue; //!< pending requests
179 :
180 : /** Manager of distributed object */
181 : ObjectStore* objectStore;
182 :
183 : /** Needed for thread-safety during nodeID-based connect() */
184 : lunchbox::Lock connectLock;
185 :
186 : /** The node for each connection. */
187 : ConnectionNodeHash connectionNodes; // read and write: recv only
188 :
189 : /** The connected nodes. */
190 : lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
191 :
192 : /** The connection set of all connections from/to this node. */
193 : co::ConnectionSet incoming;
194 :
195 : /** The process-global clock. */
196 : lunchbox::Clock clock;
197 :
198 : /** The registered push handlers. */
199 : lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
200 :
201 : /** The registered custom command handlers. */
202 : lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
203 :
204 : ReceiverThread* receiverThread;
205 : CommandThread* commandThread;
206 :
207 : lunchbox::Lockable< servus::Servus > service;
208 :
209 : // Performance counters:
210 : a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
211 : };
212 : }
213 :
214 55 : LocalNode::LocalNode( const uint32_t type )
215 : : Node( type )
216 55 : , _impl( new detail::LocalNode )
217 : {
218 55 : _impl->receiverThread = new detail::ReceiverThread( this );
219 55 : _impl->commandThread = new detail::CommandThread( this );
220 55 : _impl->objectStore = new ObjectStore( this, _impl->counters );
221 :
222 55 : CommandQueue* queue = getCommandThreadQueue();
223 : registerCommand( CMD_NODE_CONNECT,
224 55 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
225 : registerCommand( CMD_NODE_CONNECT_BE,
226 55 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
227 : registerCommand( CMD_NODE_CONNECT_REPLY,
228 55 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
229 : registerCommand( CMD_NODE_CONNECT_REPLY_BE,
230 55 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
231 : registerCommand( CMD_NODE_ID,
232 55 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
233 : registerCommand( CMD_NODE_ID_BE,
234 55 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
235 : registerCommand( CMD_NODE_ACK_REQUEST,
236 55 : CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
237 : registerCommand( CMD_NODE_STOP_RCV,
238 55 : CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
239 : registerCommand( CMD_NODE_STOP_CMD,
240 55 : CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
241 : registerCommand( CMD_NODE_SET_AFFINITY_RCV,
242 55 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
243 : registerCommand( CMD_NODE_SET_AFFINITY_CMD,
244 55 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
245 : registerCommand( CMD_NODE_CONNECT_ACK,
246 55 : CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
247 : registerCommand( CMD_NODE_DISCONNECT,
248 55 : CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
249 : registerCommand( CMD_NODE_GET_NODE_DATA,
250 55 : CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
251 : registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
252 55 : CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
253 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
254 55 : CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
255 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
256 55 : CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
257 : registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
258 55 : CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
259 : registerCommand( CMD_NODE_ADD_LISTENER,
260 55 : CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
261 : registerCommand( CMD_NODE_REMOVE_LISTENER,
262 55 : CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
263 : registerCommand( CMD_NODE_PING,
264 55 : CmdFunc( this, &LocalNode::_cmdPing ), queue );
265 : registerCommand( CMD_NODE_PING_REPLY,
266 55 : CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
267 : registerCommand( CMD_NODE_COMMAND,
268 55 : CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
269 : registerCommand( CMD_NODE_ADD_CONNECTION,
270 55 : CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
271 55 : }
272 :
273 148 : LocalNode::~LocalNode( )
274 : {
275 53 : LBASSERT( !hasPendingRequests( ));
276 53 : delete _impl;
277 95 : }
278 :
279 1 : bool LocalNode::initLocal( const int argc, char** argv )
280 : {
281 1 : std::string setupOpts;
282 :
283 : // We do not use getopt_long because it really does not work due to the
284 : // following aspects:
285 : // - reordering of arguments
286 : // - different behavior of GNU and BSD implementations
287 : // - incomplete man pages
288 1 : for( int i=1; i<argc; ++i )
289 : {
290 0 : if( std::string( "--eq-listen" ) == argv[i] )
291 0 : LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
292 0 : if( std::string( "--eq-listen" ) == argv[i] ||
293 0 : std::string( "--co-listen" ) == argv[i] )
294 : {
295 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
296 : {
297 0 : std::string data = argv[++i];
298 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
299 0 : desc->port = Global::getDefaultPort();
300 :
301 0 : if( desc->fromString( data ))
302 : {
303 0 : addConnectionDescription( desc );
304 0 : LBASSERTINFO( data.empty(), data );
305 : }
306 : else
307 0 : LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
308 : }
309 : else
310 : {
311 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
312 : }
313 : }
314 0 : else if( std::string( "--co-globals" ) == argv[i] )
315 : {
316 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
317 : {
318 0 : const std::string data = argv[++i];
319 0 : if( !Global::fromString( data ))
320 : {
321 0 : LBWARN << "Invalid global variables string: " << data
322 0 : << ", using default global variables." << std::endl;
323 0 : }
324 : }
325 : else
326 : {
327 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
328 : }
329 : }
330 0 : else if( std::string( "--co-launch" ) == argv[i] )
331 : {
332 0 : if( i == argc-1 || argv[i+1][0] == '-' )
333 0 : LBTHROW( std::runtime_error( "Missing options to --co-launch"));
334 :
335 0 : setupOpts = argv[ ++i ];
336 0 : if( !deserialize( setupOpts ))
337 0 : LBTHROW( std::runtime_error( "Corrupt --co-launch argument" ));
338 0 : LBASSERT( !setupOpts.empty( ));
339 : }
340 : }
341 :
342 1 : if( !listen( ))
343 : {
344 0 : LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
345 0 : << std::endl;
346 0 : return false;
347 : }
348 :
349 1 : if( !setupOpts.empty( ))
350 0 : return _setupPeer( setupOpts );
351 1 : return true;
352 : }
353 :
354 50 : bool LocalNode::listen()
355 : {
356 50 : LBVERB << "Listener data: " << serialize() << std::endl;
357 50 : if( !isClosed() || !_connectSelf( ))
358 0 : return false;
359 :
360 50 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
361 291 : for( ConnectionDescriptionsCIter i = descriptions.begin();
362 194 : i != descriptions.end(); ++i )
363 : {
364 47 : ConnectionDescriptionPtr description = *i;
365 94 : ConnectionPtr connection = Connection::create( description );
366 :
367 47 : if( !connection || !connection->listen( ))
368 : {
369 0 : LBWARN << "Can't create listener connection: " << description
370 0 : << std::endl;
371 0 : return false;
372 : }
373 :
374 47 : _impl->connectionNodes[ connection ] = this;
375 47 : if( connection->isMulticast( ))
376 0 : _addMulticast( this, connection );
377 :
378 47 : connection->acceptNB();
379 47 : _impl->incoming.addConnection( connection );
380 :
381 94 : LBVERB << "Added node " << getNodeID() << " using " << connection
382 141 : << std::endl;
383 47 : }
384 :
385 100 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
386 150 : << std::endl;
387 :
388 50 : _setListening();
389 50 : _impl->receiverThread->start();
390 :
391 50 : LBDEBUG << *this << std::endl;
392 50 : return true;
393 : }
394 :
395 49 : bool LocalNode::close()
396 : {
397 49 : if( !isListening() )
398 0 : return false;
399 :
400 49 : send( CMD_NODE_STOP_RCV );
401 :
402 49 : LBCHECK( _impl->receiverThread->join( ));
403 49 : _cleanup();
404 :
405 147 : LBDEBUG << _impl->incoming.getSize() << " connections open after close"
406 147 : << std::endl;
407 : #ifndef NDEBUG
408 49 : const Connections& connections = _impl->incoming.getConnections();
409 147 : for( Connections::const_iterator i = connections.begin();
410 98 : i != connections.end(); ++i )
411 : {
412 0 : LBDEBUG << " " << *i << std::endl;
413 : }
414 : #endif
415 :
416 49 : LBASSERTINFO( !hasPendingRequests(),
417 : *static_cast< lunchbox::RequestHandler* >( this ));
418 49 : return true;
419 : }
420 :
421 0 : void LocalNode::setAffinity( const int32_t affinity )
422 : {
423 0 : send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
424 0 : send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
425 :
426 0 : lunchbox::Thread::setAffinity( affinity );
427 0 : }
428 :
429 34 : void LocalNode::addConnection( ConnectionPtr connection )
430 : {
431 34 : connection->ref(); // unref in _cmdAddConnection
432 34 : send( CMD_NODE_ADD_CONNECTION ) << connection;
433 34 : }
434 :
435 0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
436 : {
437 0 : LBASSERT( isListening( ));
438 :
439 0 : ConnectionPtr connection = Connection::create( desc );
440 0 : if( connection && connection->listen( ))
441 : {
442 0 : addListener( connection );
443 0 : return connection;
444 : }
445 :
446 0 : return 0;
447 : }
448 :
449 0 : void LocalNode::addListener( ConnectionPtr connection )
450 : {
451 0 : LBASSERT( isListening( ));
452 0 : LBASSERT( connection->isListening( ));
453 0 : if( !isListening() || !connection->isListening( ))
454 0 : return;
455 :
456 0 : connection->ref( this ); // unref in self handler
457 :
458 : // Update everybody's description list of me, add the listener to myself in
459 : // my handler
460 0 : const Nodes& nodes = getNodes();
461 0 : for( NodePtr node : nodes )
462 : node->send( CMD_NODE_ADD_LISTENER )
463 0 : << (uint64_t)(connection.get( ))
464 0 : << connection->getDescription()->toString();
465 : }
466 :
467 0 : void LocalNode::removeListeners( const Connections& connections )
468 : {
469 0 : std::vector< lunchbox::Request< void > > requests;
470 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
471 : {
472 0 : ConnectionPtr connection = *i;
473 0 : requests.push_back( _removeListener( connection ));
474 0 : }
475 0 : }
476 :
477 0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
478 : {
479 0 : LBASSERT( isListening( ));
480 0 : LBASSERTINFO( !conn->isConnected(), conn );
481 :
482 0 : conn->ref( this );
483 0 : const lunchbox::Request< void > request = registerRequest< void >();
484 :
485 0 : const Nodes& nodes = getNodes();
486 0 : for( NodePtr node : nodes )
487 : node->send( CMD_NODE_REMOVE_LISTENER )
488 0 : << request << conn.get() << conn->getDescription()->toString();
489 0 : return request;
490 : }
491 :
492 152 : void LocalNode::_addConnection( ConnectionPtr connection )
493 : {
494 152 : if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
495 : {
496 34 : addConnection( connection );
497 186 : return;
498 : }
499 :
500 118 : BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
501 118 : connection->recvNB( buffer, COMMAND_MINSIZE );
502 118 : _impl->incoming.addConnection( connection );
503 : }
504 :
505 224 : void LocalNode::_removeConnection( ConnectionPtr connection )
506 : {
507 224 : LBASSERT( connection );
508 :
509 224 : _impl->incoming.removeConnection( connection );
510 224 : connection->resetRecvData();
511 224 : if( !connection->isClosed( ))
512 130 : connection->close(); // cancel pending IO's
513 224 : }
514 :
515 49 : void LocalNode::_cleanup()
516 : {
517 49 : LBVERB << "Clean up stopped node" << std::endl;
518 49 : LBASSERTINFO( isClosed(), *this );
519 :
520 49 : if( !_impl->connectionNodes.empty( ))
521 141 : LBDEBUG << _impl->connectionNodes.size()
522 141 : << " open connections during cleanup" << std::endl;
523 : #ifndef NDEBUG
524 288 : for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
525 192 : i != _impl->connectionNodes.end(); ++i )
526 : {
527 47 : NodePtr node = i->second;
528 47 : LBDEBUG << " " << i->first << " : " << node << std::endl;
529 47 : }
530 : #endif
531 :
532 49 : _impl->connectionNodes.clear();
533 :
534 49 : if( !_impl->nodes->empty( ))
535 6 : LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
536 6 : << std::endl;
537 :
538 : #ifndef NDEBUG
539 51 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
540 : {
541 2 : NodePtr node = i->second;
542 2 : LBDEBUG << " " << node << std::endl;
543 2 : }
544 : #endif
545 :
546 49 : _impl->nodes->clear();
547 49 : }
548 :
549 115 : void LocalNode::_closeNode( NodePtr node )
550 : {
551 115 : ConnectionPtr connection = node->getConnection();
552 230 : ConnectionPtr mcConnection = node->_getMulticast();
553 :
554 115 : node->_disconnect();
555 :
556 115 : if( connection )
557 : {
558 68 : LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
559 : _impl->connectionNodes.end(), connection );
560 :
561 68 : _removeConnection( connection );
562 68 : _impl->connectionNodes.erase( connection );
563 : }
564 :
565 115 : if( mcConnection )
566 : {
567 0 : _removeConnection( mcConnection );
568 0 : _impl->connectionNodes.erase( mcConnection );
569 : }
570 :
571 115 : _impl->objectStore->removeInstanceData( node->getNodeID( ));
572 :
573 230 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
574 115 : _impl->nodes->erase( node->getNodeID( ));
575 115 : notifyDisconnect( node );
576 230 : LBDEBUG << node << " disconnected from " << *this << std::endl;
577 115 : }
578 :
579 50 : bool LocalNode::_connectSelf()
580 : {
581 : // setup local connection to myself
582 50 : PipeConnectionPtr connection = new PipeConnection;
583 50 : if( !connection->connect( ))
584 : {
585 0 : LBERROR << "Could not create local connection to receiver thread."
586 0 : << std::endl;
587 0 : return false;
588 : }
589 :
590 50 : Node::_connect( connection->getSibling( ));
591 50 : _setClosed(); // reset state after _connect set it to connected
592 :
593 : // add to connection set
594 50 : LBASSERT( connection->getDescription( ));
595 50 : LBASSERT( _impl->connectionNodes.find( connection ) ==
596 : _impl->connectionNodes.end( ));
597 :
598 50 : _impl->connectionNodes[ connection ] = this;
599 50 : _impl->nodes.data[ getNodeID() ] = this;
600 50 : _addConnection( connection );
601 :
602 100 : LBVERB << "Added node " << getNodeID() << " using " << connection
603 150 : << std::endl;
604 50 : return true;
605 : }
606 :
607 8 : bool LocalNode::disconnect( NodePtr node )
608 : {
609 8 : if( !node || !isListening() )
610 0 : return false;
611 :
612 8 : if( !node->isConnected( ))
613 0 : return true;
614 :
615 8 : LBASSERT( !inCommandThread( ));
616 8 : lunchbox::Request< void > request = registerRequest< void >( node.get( ));
617 8 : send( CMD_NODE_DISCONNECT ) << request;
618 :
619 8 : request.wait();
620 8 : _impl->objectStore->removeNode( node );
621 8 : return true;
622 : }
623 :
624 20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
625 : {
626 20001 : if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
627 20001 : return;
628 :
629 20001 : if( node == this ) // OPT
630 0 : serveRequest( requestID );
631 : else
632 20001 : node->send( CMD_NODE_ACK_REQUEST ) << requestID;
633 : }
634 :
635 0 : void LocalNode::ping( NodePtr peer )
636 : {
637 0 : LBASSERT( !_impl->inReceiverThread( ));
638 0 : peer->send( CMD_NODE_PING );
639 0 : }
640 :
641 0 : bool LocalNode::pingIdleNodes()
642 : {
643 0 : LBASSERT( !_impl->inReceiverThread( ) );
644 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
645 0 : const Nodes& nodes = getNodes( false );
646 :
647 0 : bool pinged = false;
648 0 : for( NodePtr node : nodes )
649 : {
650 0 : if( getTime64() - node->getLastReceiveTime() > timeout )
651 : {
652 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
653 0 : << node->getLastReceiveTime() << std::endl;
654 0 : node->send( CMD_NODE_PING );
655 0 : pinged = true;
656 : }
657 0 : }
658 0 : return pinged;
659 : }
660 :
661 : //----------------------------------------------------------------------
662 : // Object functionality
663 : //----------------------------------------------------------------------
664 0 : void LocalNode::disableInstanceCache()
665 : {
666 0 : _impl->objectStore->disableInstanceCache();
667 0 : }
668 :
669 0 : void LocalNode::expireInstanceData( const int64_t age )
670 : {
671 0 : _impl->objectStore->expireInstanceData( age );
672 0 : }
673 :
674 0 : void LocalNode::enableSendOnRegister()
675 : {
676 0 : _impl->objectStore->enableSendOnRegister();
677 0 : }
678 :
679 0 : void LocalNode::disableSendOnRegister()
680 : {
681 0 : _impl->objectStore->disableSendOnRegister();
682 0 : }
683 :
684 21 : bool LocalNode::registerObject( Object* object )
685 : {
686 21 : return _impl->objectStore->register_( object );
687 : }
688 :
689 21 : void LocalNode::deregisterObject( Object* object )
690 : {
691 21 : _impl->objectStore->deregister( object );
692 21 : }
693 :
694 40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
695 : NodePtr master, const uint128_t& version )
696 : {
697 : const uint32_t request = _impl->objectStore->mapNB( object, id, version,
698 40 : master );
699 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
700 40 : _impl->objectStore, request );
701 40 : return f_bool_t( new FuturebImpl( func ));
702 : }
703 :
704 0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
705 : const uint128_t& version )
706 : {
707 0 : return _impl->objectStore->mapNB( object, id, version, 0 );
708 : }
709 :
710 2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
711 : const uint128_t& version, NodePtr master )
712 : {
713 2 : return _impl->objectStore->mapNB( object, id, version, master );
714 : }
715 :
716 :
717 2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
718 : {
719 2 : return _impl->objectStore->mapSync( requestID );
720 : }
721 :
722 4 : f_bool_t LocalNode::syncObject( Object* object, const uint128_t& id,
723 : NodePtr master, const uint32_t instanceID )
724 : {
725 4 : return _impl->objectStore->sync( object, id, master, instanceID );
726 : }
727 :
728 42 : void LocalNode::unmapObject( Object* object )
729 : {
730 42 : _impl->objectStore->unmap( object );
731 42 : }
732 :
733 0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
734 : {
735 0 : _impl->objectStore->swap( oldObject, newObject );
736 0 : }
737 :
738 0 : void LocalNode::objectPush( const uint128_t& groupID,
739 : const uint128_t& objectType,
740 : const uint128_t& objectID, DataIStream& istream )
741 : {
742 0 : lunchbox::ScopedRead mutex( _impl->pushHandlers );
743 0 : HandlerHashCIter i = _impl->pushHandlers->find( groupID );
744 0 : if( i != _impl->pushHandlers->end( ))
745 0 : i->second( groupID, objectType, objectID, istream );
746 : else
747 0 : LBWARN << "No custom handler for push group " << groupID
748 0 : << " registered" << std::endl;
749 :
750 0 : if( istream.wasUsed() && istream.hasData( ))
751 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
752 0 : << objectType << " object " << objectID << std::endl;
753 0 : }
754 :
755 0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
756 : const PushHandler& handler )
757 : {
758 0 : lunchbox::ScopedWrite mutex( _impl->pushHandlers );
759 0 : (*_impl->pushHandlers)[ groupID ] = handler;
760 0 : }
761 :
762 2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
763 : const CommandHandler& func,
764 : CommandQueue* queue )
765 : {
766 2 : lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
767 2 : if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
768 : {
769 0 : LBWARN << "Already got a registered handler for custom command "
770 0 : << command << std::endl;
771 0 : return false;
772 : }
773 :
774 : _impl->commandHandlers->insert( std::make_pair( command,
775 2 : std::make_pair( func, queue )));
776 2 : return true;
777 : }
778 :
779 0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
780 : {
781 0 : LBASSERT( !inCommandThread( ));
782 0 : LBASSERT( !_impl->inReceiverThread( ));
783 :
784 0 : lunchbox::Request< void > request = registerRequest< void >();
785 0 : node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
786 :
787 : try
788 : {
789 0 : request.wait( Global::getTimeout( ));
790 : }
791 0 : catch( const lunchbox::FutureTimeout& )
792 : {
793 0 : LBERROR << "Timeout while acquiring send token " << request.getID()
794 0 : << std::endl;
795 0 : request.unregister();
796 0 : return 0;
797 : }
798 0 : return new co::SendToken( node );
799 : }
800 :
801 0 : void LocalNode::releaseSendToken( SendToken token )
802 : {
803 0 : LBASSERT( !_impl->inReceiverThread( ));
804 0 : if( token )
805 0 : token->release();
806 0 : }
807 :
808 : //----------------------------------------------------------------------
809 : // Connecting a node
810 : //----------------------------------------------------------------------
811 : namespace
812 : {
813 : enum ConnectResult
814 : {
815 : CONNECT_OK,
816 : CONNECT_TRY_AGAIN,
817 : CONNECT_BAD_STATE,
818 : CONNECT_TIMEOUT,
819 : CONNECT_UNREACHABLE
820 : };
821 : }
822 :
823 73 : NodePtr LocalNode::connect( const NodeID& nodeID )
824 : {
825 73 : LBASSERT( nodeID != 0 );
826 74 : LBASSERT( isListening( ));
827 :
828 : // Make sure that only one connection request based on the node identifier
829 : // is pending at a given time. Otherwise a node with the same id might be
830 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
831 : // mutex is to register connecting nodes with this local node, and handle
832 : // all cases correctly, which is far more complex. Node connections only
833 : // happen a lot during initialization, and are therefore not time-critical.
834 74 : lunchbox::ScopedWrite mutex( _impl->connectLock );
835 :
836 148 : Nodes nodes = getNodes();
837 77 : for( NodePtr peer : nodes )
838 : {
839 77 : if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
840 74 : return peer;
841 3 : }
842 :
843 0 : LBDEBUG << "Connecting node " << nodeID << std::endl;
844 0 : for( NodePtr peer : nodes )
845 : {
846 0 : NodePtr node = _connect( nodeID, peer );
847 0 : if( node )
848 0 : return node;
849 0 : }
850 : {
851 0 : NodePtr node = _connectFromZeroconf( nodeID );
852 0 : if( node )
853 0 : return node;
854 : }
855 : // check again if node connected by itself by now
856 0 : nodes = getNodes();
857 0 : for( NodePtr node : nodes )
858 0 : if( node->getNodeID() == nodeID && node->isReachable( ))
859 0 : return node;
860 :
861 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
862 74 : return 0;
863 : }
864 :
865 0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
866 : {
867 0 : LBASSERT( nodeID != 0 );
868 :
869 0 : NodePtr node;
870 : {
871 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
872 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
873 0 : if( i != _impl->nodes->end( ))
874 0 : node = i->second;
875 : }
876 :
877 0 : LBASSERT( getNodeID() != nodeID );
878 0 : if( !node )
879 : {
880 0 : lunchbox::Request< void* > request = registerRequest< void* >();
881 0 : peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
882 0 : node = reinterpret_cast< Node* >( request.wait( ));
883 0 : if( !node )
884 : {
885 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
886 0 : << std::endl;
887 0 : return 0;
888 : }
889 0 : node->unref( this ); // ref'd before serveRequest()
890 : }
891 :
892 0 : if( node->isReachable( ))
893 0 : return node;
894 :
895 0 : size_t tries = 10;
896 0 : while( --tries )
897 : {
898 0 : switch( _connect( node ))
899 : {
900 : case CONNECT_OK:
901 0 : return node;
902 : case CONNECT_TRY_AGAIN:
903 : {
904 0 : lunchbox::RNG rng;
905 : // collision avoidance
906 0 : lunchbox::sleep( rng.get< uint8_t >( ));
907 0 : break;
908 : }
909 : case CONNECT_BAD_STATE:
910 0 : LBWARN << "Internal connect error" << std::endl;
911 : // no break;
912 : case CONNECT_TIMEOUT:
913 0 : return 0;
914 :
915 : case CONNECT_UNREACHABLE:
916 0 : break; // maybe peer talks to us
917 : }
918 :
919 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
920 : // connect failed - check for simultaneous connect from peer
921 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
922 0 : if( i != _impl->nodes->end( ))
923 0 : node = i->second;
924 0 : }
925 :
926 0 : return node->isReachable() ? node : 0;
927 : }
928 :
929 0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
930 : {
931 0 : lunchbox::ScopedWrite mutex( _impl->service );
932 :
933 : const Strings& instances =
934 0 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
935 0 : for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
936 : {
937 0 : const std::string& instance = *i;
938 0 : const NodeID candidate( instance );
939 0 : if( candidate != nodeID )
940 0 : continue;
941 :
942 0 : const std::string& typeStr = _impl->service->get( instance, "co_type" );
943 0 : if( typeStr.empty( ))
944 0 : return 0;
945 :
946 0 : std::istringstream in( typeStr );
947 0 : uint32_t type = 0;
948 0 : in >> type;
949 :
950 0 : NodePtr node = createNode( type );
951 0 : if( !node )
952 : {
953 0 : LBINFO << "Can't create node of type " << type << std::endl;
954 0 : continue;
955 : }
956 :
957 : const std::string& numStr = _impl->service->get( instance,
958 0 : "co_numPorts" );
959 0 : uint32_t num = 0;
960 :
961 0 : in.clear();
962 0 : in.str( numStr );
963 0 : in >> num;
964 0 : LBASSERT( num > 0 );
965 0 : for( size_t j = 0; j < num; ++j )
966 : {
967 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
968 0 : std::ostringstream out;
969 0 : out << "co_port" << j;
970 :
971 0 : std::string descStr = _impl->service->get( instance, out.str( ));
972 0 : LBASSERT( !descStr.empty( ));
973 0 : LBCHECK( desc->fromString( descStr ));
974 0 : LBASSERT( descStr.empty( ));
975 0 : node->addConnectionDescription( desc );
976 0 : }
977 0 : mutex.leave();
978 0 : if( _connect( node ))
979 0 : return node;
980 0 : }
981 0 : return 0;
982 : }
983 :
984 34 : bool LocalNode::connect( NodePtr node )
985 : {
986 34 : lunchbox::ScopedWrite mutex( _impl->connectLock );
987 34 : return ( _connect( node ) == CONNECT_OK );
988 : }
989 :
990 34 : uint32_t LocalNode::_connect( NodePtr node )
991 : {
992 34 : LBASSERTINFO( isListening(), *this );
993 34 : if( node->isReachable( ))
994 0 : return CONNECT_OK;
995 :
996 34 : LBASSERT( node->isClosed( ));
997 34 : LBDEBUG << "Connecting " << node << std::endl;
998 :
999 : // try connecting using the given descriptions
1000 34 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
1001 102 : for( ConnectionDescriptionsCIter i = cds.begin();
1002 68 : i != cds.end(); ++i )
1003 : {
1004 34 : ConnectionDescriptionPtr description = *i;
1005 34 : if( description->type >= CONNECTIONTYPE_MULTICAST )
1006 0 : continue; // Don't use multicast for primary connections
1007 :
1008 34 : ConnectionPtr connection = Connection::create( description );
1009 34 : if( !connection || !connection->connect( ))
1010 0 : continue;
1011 :
1012 34 : return _connect( node, connection );
1013 0 : }
1014 :
1015 0 : LBDEBUG << "Node " << node
1016 0 : << " unreachable, all connections failed to connect" <<std::endl;
1017 0 : return CONNECT_UNREACHABLE;
1018 : }
1019 :
1020 0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
1021 : {
1022 0 : return ( _connect( node, connection ) == CONNECT_OK );
1023 : }
1024 :
1025 34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
1026 : {
1027 34 : LBASSERT( connection );
1028 34 : LBASSERT( node->getNodeID() != getNodeID( ));
1029 :
1030 68 : if( !node || !isListening() || !connection->isConnected() ||
1031 34 : !node->isClosed( ))
1032 : {
1033 0 : return CONNECT_BAD_STATE;
1034 : }
1035 :
1036 34 : _addConnection( connection );
1037 :
1038 : // send connect command to peer
1039 34 : lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
1040 : #ifdef COLLAGE_BIGENDIAN
1041 : uint32_t cmd = CMD_NODE_CONNECT_BE;
1042 : lunchbox::byteswap( cmd );
1043 : #else
1044 34 : const uint32_t cmd = CMD_NODE_CONNECT;
1045 : #endif
1046 : OCommand( Connections( 1, connection ), cmd )
1047 34 : << getNodeID() << request << getType() << serialize();
1048 :
1049 34 : bool connected = false;
1050 : try
1051 : {
1052 34 : connected = request.wait( 10000 /*ms*/ );
1053 : }
1054 0 : catch( const lunchbox::FutureTimeout& )
1055 : {
1056 0 : LBWARN << "Node connection handshake timeout - " << node
1057 0 : << " not a Collage node?" << std::endl;
1058 0 : request.unregister();
1059 0 : return CONNECT_TIMEOUT;
1060 : }
1061 :
1062 : // In simultaneous connect case, depending on the connection type
1063 : // (e.g. RDMA), a check on the connection state of the node is required
1064 34 : if( !connected || !node->isConnected( ))
1065 0 : return CONNECT_TRY_AGAIN;
1066 :
1067 34 : LBASSERT( node->getNodeID() != 0 );
1068 34 : LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
1069 34 : LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
1070 34 : return CONNECT_OK;
1071 : }
1072 :
1073 40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
1074 : {
1075 40 : LBASSERTINFO( id.isUUID(), id );
1076 40 : if( !id.isUUID( ))
1077 : {
1078 0 : LBWARN << "Invalid object id " << id << std::endl;
1079 0 : return 0;
1080 : }
1081 :
1082 40 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
1083 40 : if( masterNodeID == 0 )
1084 : {
1085 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1086 0 : return 0;
1087 : }
1088 :
1089 40 : NodePtr master = connect( masterNodeID );
1090 40 : if( master && !master->isClosed( ))
1091 40 : return master;
1092 :
1093 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1094 0 : << " for object " << id << std::endl;
1095 0 : return 0;
1096 : }
1097 :
1098 0 : bool LocalNode::launch( NodePtr node, const std::string& command )
1099 : {
1100 0 : size_t lastPos = 0;
1101 0 : std::string cmd;
1102 :
1103 0 : const std::string& quote = node->getLaunchQuote();
1104 : std::string launchOptions =
1105 0 : std::string( " --co-launch " ) + quote + node->serialize() +
1106 0 : node->getWorkDir() + CO_SEPARATOR + std::to_string( getType( )) +
1107 0 : CO_SEPARATOR + serialize() + quote +
1108 0 : " --co-globals " + quote + co::Global::toString() + quote;
1109 :
1110 0 : for( size_t percentPos = command.find( '%' );
1111 : percentPos != std::string::npos;
1112 0 : percentPos = command.find( '%', percentPos + 1 ))
1113 : {
1114 0 : std::string replacement;
1115 0 : switch( command[percentPos+1] )
1116 : {
1117 : case 'h':
1118 0 : replacement = node->getHostname();
1119 0 : if( replacement.empty( ))
1120 0 : replacement = "127.0.0.1";
1121 0 : break;
1122 :
1123 : case 'n':
1124 0 : replacement = node->getNodeID().getString();
1125 0 : break;
1126 :
1127 : case 'd':
1128 0 : replacement = node->getWorkDir();
1129 0 : break;
1130 :
1131 : case 'q':
1132 0 : replacement = quote;
1133 0 : break;
1134 :
1135 : case 'o':
1136 0 : replacement = launchOptions;
1137 0 : launchOptions.clear();
1138 0 : break;
1139 :
1140 : default:
1141 0 : LBWARN << "Unknown token " << command[percentPos+1] << std::endl;
1142 0 : replacement = command.substr( percentPos, percentPos+1 );
1143 : }
1144 :
1145 0 : cmd += command.substr( lastPos, percentPos-lastPos ) + replacement;
1146 0 : lastPos = percentPos + 2;
1147 0 : }
1148 :
1149 0 : cmd += command.substr( lastPos ) + launchOptions;
1150 :
1151 0 : LBDEBUG << "Launching: " << cmd << std::endl;
1152 0 : if( lunchbox::Launcher::run( cmd ))
1153 0 : return true;
1154 :
1155 0 : LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
1156 0 : return false;
1157 : }
1158 :
1159 0 : NodePtr LocalNode::syncLaunch( const uint128_t& nodeID, const int64_t timeout )
1160 : {
1161 0 : const lunchbox::Clock clock;
1162 :
1163 0 : do
1164 : {
1165 0 : co::NodePtr node = getNode( nodeID );
1166 0 : if( node && node->isConnected( ))
1167 0 : return node;
1168 :
1169 0 : lunchbox::sleep( 100 /*ms*/ );
1170 : }
1171 0 : while( timeout == static_cast< int32_t >( LB_TIMEOUT_INDEFINITE ) ||
1172 0 : clock.getTime64() < timeout );
1173 :
1174 0 : return nullptr;
1175 : }
1176 :
1177 0 : bool LocalNode::_setupPeer( const std::string& setupOpts )
1178 : {
1179 0 : size_t nextPos = setupOpts.find( CO_SEPARATOR );
1180 0 : if( nextPos == std::string::npos )
1181 : {
1182 0 : LBERROR << "Could not parse working directory: " << setupOpts
1183 0 : << std::endl;
1184 0 : return false;
1185 : }
1186 :
1187 0 : const std::string workDir = setupOpts.substr( 0, nextPos );
1188 0 : std::string description = setupOpts.substr( nextPos + 1 );
1189 :
1190 0 : if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
1191 0 : LBWARN << "Can't change working directory to " << workDir << ": "
1192 0 : << lunchbox::sysError << std::endl;
1193 :
1194 0 : nextPos = description.find( CO_SEPARATOR );
1195 0 : if( nextPos == std::string::npos )
1196 : {
1197 0 : LBERROR << "Could not parse server node type: " << description
1198 0 : << " is left from " << setupOpts << std::endl;
1199 0 : return false;
1200 : }
1201 :
1202 0 : const std::string nodeType = description.substr( 0, nextPos );
1203 0 : description = description.substr( nextPos + 1 );
1204 :
1205 0 : co::NodePtr peer = createNode( std::stoi( nodeType ));
1206 0 : if( !peer->deserialize( description ))
1207 0 : LBWARN << "Can't parse peer data" << std::endl;
1208 :
1209 0 : LBASSERTINFO( description.empty(), description );
1210 0 : if( !connect( peer ))
1211 : {
1212 0 : LBERROR << "Can't connect peer node using " << *peer << std::endl;
1213 0 : return false;
1214 : }
1215 0 : return true;
1216 : }
1217 :
1218 34 : NodePtr LocalNode::createNode( const uint32_t type )
1219 : {
1220 34 : LBASSERTINFO( type == NODETYPE_NODE, type );
1221 34 : return new Node( type );
1222 : }
1223 :
1224 0 : NodePtr LocalNode::getNode( const NodeID& id ) const
1225 : {
1226 0 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1227 0 : NodeHash::const_iterator i = _impl->nodes->find( id );
1228 0 : if( i == _impl->nodes->end() || !i->second->isReachable( ))
1229 0 : return 0;
1230 0 : return i->second;
1231 : }
1232 :
1233 114 : Nodes LocalNode::getNodes( const bool addSelf ) const
1234 : {
1235 114 : Nodes nodes;
1236 228 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1237 334 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1238 : {
1239 220 : NodePtr node = i->second;
1240 220 : if( node->isReachable() && ( addSelf || node != this ))
1241 220 : nodes.push_back( i->second );
1242 220 : }
1243 228 : return nodes;
1244 : }
1245 :
1246 146 : CommandQueue* LocalNode::getCommandThreadQueue()
1247 : {
1248 146 : return _impl->commandThread->getWorkerQueue();
1249 : }
1250 :
1251 8 : bool LocalNode::inCommandThread() const
1252 : {
1253 8 : return _impl->commandThread->isCurrent();
1254 : }
1255 :
1256 72428 : int64_t LocalNode::getTime64() const
1257 : {
1258 72428 : return _impl->clock.getTime64();
1259 : }
1260 :
1261 0 : ssize_t LocalNode::getCounter( const Counter counter ) const
1262 : {
1263 0 : return _impl->counters[ counter ];
1264 : }
1265 :
1266 196 : void LocalNode::flushCommands()
1267 : {
1268 196 : _impl->incoming.interrupt();
1269 196 : }
1270 :
1271 : //----------------------------------------------------------------------
1272 : // receiver thread functions
1273 : //----------------------------------------------------------------------
1274 50 : void LocalNode::_runReceiverThread()
1275 : {
1276 50 : LB_TS_THREAD( _rcvThread );
1277 50 : _initService();
1278 :
1279 50 : int nErrors = 0;
1280 72653 : while( isListening( ))
1281 : {
1282 72554 : const ConnectionSet::Event result = _impl->incoming.select();
1283 72553 : switch( result )
1284 : {
1285 : case ConnectionSet::EVENT_CONNECT:
1286 34 : _handleConnect();
1287 34 : break;
1288 :
1289 : case ConnectionSet::EVENT_DATA:
1290 72179 : nErrors = 0;
1291 72179 : _handleData();
1292 72179 : break;
1293 :
1294 : case ConnectionSet::EVENT_DISCONNECT:
1295 : case ConnectionSet::EVENT_INVALID_HANDLE:
1296 34 : _handleDisconnect();
1297 34 : break;
1298 :
1299 : case ConnectionSet::EVENT_TIMEOUT:
1300 0 : LBINFO << "select timeout" << std::endl;
1301 0 : break;
1302 :
1303 : case ConnectionSet::EVENT_ERROR:
1304 0 : ++nErrors;
1305 0 : LBWARN << "Connection error during select" << std::endl;
1306 0 : if( nErrors > 100 )
1307 : {
1308 0 : LBWARN << "Too many errors in a row, capping connection"
1309 0 : << std::endl;
1310 0 : _handleDisconnect();
1311 : }
1312 0 : break;
1313 :
1314 : case ConnectionSet::EVENT_SELECT_ERROR:
1315 0 : LBWARN << "Error during select" << std::endl;
1316 0 : ++nErrors;
1317 0 : if( nErrors > 10 )
1318 : {
1319 0 : LBWARN << "Too many errors in a row" << std::endl;
1320 0 : LBUNIMPLEMENTED;
1321 : }
1322 0 : break;
1323 :
1324 : case ConnectionSet::EVENT_INTERRUPT:
1325 306 : _redispatchCommands();
1326 306 : break;
1327 :
1328 : default:
1329 0 : LBUNIMPLEMENTED;
1330 : }
1331 : }
1332 :
1333 49 : if( !_impl->pendingCommands.empty( ))
1334 0 : LBWARN << _impl->pendingCommands.size()
1335 0 : << " commands pending while leaving command thread" << std::endl;
1336 :
1337 49 : _impl->pendingCommands.clear();
1338 49 : LBCHECK( _impl->commandThread->join( ));
1339 :
1340 49 : ConnectionPtr connection = getConnection();
1341 98 : PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
1342 49 : connection = pipe->getSibling();
1343 49 : _removeConnection( connection );
1344 49 : _impl->connectionNodes.erase( connection );
1345 49 : _disconnect();
1346 :
1347 49 : const Connections& connections = _impl->incoming.getConnections();
1348 171 : while( !connections.empty( ))
1349 : {
1350 73 : connection = connections.back();
1351 73 : NodePtr node = _impl->connectionNodes[ connection ];
1352 :
1353 73 : if( node )
1354 73 : _closeNode( node );
1355 73 : _removeConnection( connection );
1356 73 : }
1357 :
1358 49 : _impl->objectStore->clear();
1359 49 : _impl->pendingCommands.clear();
1360 49 : _impl->smallBuffers.flush();
1361 49 : _impl->bigBuffers.flush();
1362 :
1363 245 : LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
1364 245 : << std::endl;
1365 49 : }
1366 :
1367 34 : void LocalNode::_handleConnect()
1368 : {
1369 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1370 68 : ConnectionPtr newConn = connection->acceptSync();
1371 34 : connection->acceptNB();
1372 :
1373 34 : if( newConn )
1374 34 : _addConnection( newConn );
1375 : else
1376 34 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1377 34 : }
1378 :
1379 34 : void LocalNode::_handleDisconnect()
1380 : {
1381 34 : while( _handleData( )) ; // read remaining data off connection
1382 :
1383 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1384 34 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
1385 :
1386 34 : if( i != _impl->connectionNodes.end( ))
1387 : {
1388 34 : NodePtr node = i->second;
1389 :
1390 34 : node->ref(); // extend lifetime to give cmd handler a chance
1391 :
1392 : // local command dispatching
1393 : OCommand( this, this, CMD_NODE_REMOVE_NODE )
1394 34 : << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
1395 :
1396 34 : if( node->getConnection() == connection )
1397 34 : _closeNode( node );
1398 0 : else if( connection->isMulticast( ))
1399 0 : node->_removeMulticast( connection );
1400 : }
1401 :
1402 34 : _removeConnection( connection );
1403 34 : }
1404 :
1405 72213 : bool LocalNode::_handleData()
1406 : {
1407 72213 : _impl->smallBuffers.compact();
1408 72213 : _impl->bigBuffers.compact();
1409 :
1410 72213 : ConnectionPtr connection = _impl->incoming.getConnection();
1411 72213 : LBASSERT( connection );
1412 :
1413 144426 : BufferPtr buffer = _readHead( connection );
1414 72213 : if( !buffer ) // fluke signal
1415 68 : return false;
1416 :
1417 144290 : ICommand command = _setupCommand( connection, buffer );
1418 72145 : const bool gotCommand = _readTail( command, buffer, connection );
1419 72145 : LBASSERT( gotCommand );
1420 :
1421 : // start next receive
1422 144290 : BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1423 72145 : connection->recvNB( nextBuffer, COMMAND_MINSIZE );
1424 :
1425 72145 : if( gotCommand )
1426 : {
1427 72145 : _dispatchCommand( command );
1428 72145 : return true;
1429 : }
1430 :
1431 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1432 72213 : return false;
1433 : }
1434 :
1435 72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
1436 : {
1437 72213 : BufferPtr buffer;
1438 72213 : const bool gotSize = connection->recvSync( buffer, false );
1439 :
1440 72213 : if( !buffer ) // fluke signal
1441 : {
1442 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1443 0 : << std::endl;
1444 0 : _impl->incoming.setDirty();
1445 0 : return 0;
1446 : }
1447 :
1448 72213 : if( gotSize )
1449 72145 : return buffer;
1450 :
1451 : // Some systems signal data on dead connections.
1452 68 : buffer->setSize( 0 );
1453 68 : connection->recvNB( buffer, COMMAND_MINSIZE );
1454 68 : return 0;
1455 : }
1456 :
1457 72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
1458 : ConstBufferPtr buffer )
1459 : {
1460 72145 : NodePtr node;
1461 72145 : ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
1462 72145 : if( i != _impl->connectionNodes.end( ))
1463 72077 : node = i->second;
1464 72145 : LBVERB << "Handle data from " << node << std::endl;
1465 :
1466 : #ifdef COLLAGE_BIGENDIAN
1467 : const bool swapping = node ? !node->isBigEndian() : false;
1468 : #else
1469 72145 : const bool swapping = node ? node->isBigEndian() : false;
1470 : #endif
1471 144290 : ICommand command( this, node, buffer, swapping );
1472 :
1473 72145 : if( node )
1474 : {
1475 72077 : node->_setLastReceive( getTime64( ));
1476 72077 : return command;
1477 : }
1478 :
1479 68 : uint32_t cmd = command.getCommand();
1480 : #ifdef COLLAGE_BIGENDIAN
1481 : lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
1482 : #endif
1483 68 : switch( cmd )
1484 : {
1485 : case CMD_NODE_CONNECT:
1486 : case CMD_NODE_CONNECT_REPLY:
1487 : case CMD_NODE_ID:
1488 : #ifdef COLLAGE_BIGENDIAN
1489 : command = ICommand( this, node, buffer, true );
1490 : #endif
1491 68 : break;
1492 :
1493 : case CMD_NODE_CONNECT_BE:
1494 : case CMD_NODE_CONNECT_REPLY_BE:
1495 : case CMD_NODE_ID_BE:
1496 : #ifndef COLLAGE_BIGENDIAN
1497 0 : command = ICommand( this, node, buffer, true );
1498 : #endif
1499 0 : break;
1500 :
1501 : default:
1502 0 : LBUNIMPLEMENTED;
1503 0 : return ICommand();
1504 : }
1505 :
1506 68 : command.setCommand( cmd ); // reset correctly swapped version
1507 72213 : return command;
1508 : }
1509 :
1510 72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
1511 : ConnectionPtr connection )
1512 : {
1513 72145 : const uint64_t needed = command.getSize();
1514 72145 : if( needed <= buffer->getSize( ))
1515 62134 : return true;
1516 :
1517 10011 : if( needed > buffer->getMaxSize( ))
1518 : {
1519 0 : LBASSERT( needed > COMMAND_ALLOCSIZE );
1520 0 : LBASSERTINFO( needed < LB_BIT48,
1521 : "Out-of-sync network stream: " << command << "?" );
1522 : // not enough space for remaining data, alloc and copy to new buffer
1523 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
1524 0 : newBuffer->replace( *buffer );
1525 0 : buffer = newBuffer;
1526 :
1527 0 : command = ICommand( this, command.getRemoteNode(), buffer,
1528 0 : command.isSwapping( ));
1529 : }
1530 :
1531 : // read remaining data
1532 10011 : connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
1533 10011 : return connection->recvSync( buffer );
1534 : }
1535 :
1536 35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
1537 : {
1538 35 : LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
1539 : BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
1540 : _impl->bigBuffers.alloc( size ) :
1541 35 : _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1542 35 : return buffer;
1543 : }
1544 :
1545 72194 : void LocalNode::_dispatchCommand( ICommand& command )
1546 : {
1547 72194 : LBASSERTINFO( command.isValid(), command );
1548 :
1549 72194 : if( dispatchCommand( command ))
1550 72194 : _redispatchCommands();
1551 : else
1552 : {
1553 0 : _redispatchCommands();
1554 0 : _impl->pendingCommands.push_back( command );
1555 : }
1556 72194 : }
1557 :
1558 72231 : bool LocalNode::dispatchCommand( ICommand& command )
1559 : {
1560 72231 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1561 72231 : LBASSERTINFO( command.isValid(), command );
1562 :
1563 72231 : const uint32_t type = command.getType();
1564 72231 : switch( type )
1565 : {
1566 : case COMMANDTYPE_NODE:
1567 71726 : LBCHECK( Dispatcher::dispatchCommand( command ));
1568 71726 : return true;
1569 :
1570 : case COMMANDTYPE_OBJECT:
1571 505 : return _impl->objectStore->dispatchObjectCommand( command );
1572 :
1573 : default:
1574 0 : LBABORT( "Unknown command type " << type << " for " << command );
1575 0 : return true;
1576 : }
1577 : }
1578 :
1579 72499 : void LocalNode::_redispatchCommands()
1580 : {
1581 72499 : bool changes = true;
1582 144998 : while( changes && !_impl->pendingCommands.empty( ))
1583 : {
1584 0 : changes = false;
1585 :
1586 0 : for( CommandList::iterator i = _impl->pendingCommands.begin();
1587 0 : i != _impl->pendingCommands.end(); ++i )
1588 : {
1589 0 : ICommand& command = *i;
1590 0 : LBASSERT( command.isValid( ));
1591 :
1592 0 : if( dispatchCommand( command ))
1593 : {
1594 0 : _impl->pendingCommands.erase( i );
1595 0 : changes = true;
1596 0 : break;
1597 : }
1598 : }
1599 : }
1600 :
1601 : #ifndef NDEBUG
1602 72499 : if( !_impl->pendingCommands.empty( ))
1603 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1604 0 : << std::endl;
1605 72499 : LBASSERT( _impl->pendingCommands.size() < 200 );
1606 : #endif
1607 72500 : }
1608 :
1609 50 : void LocalNode::_initService()
1610 : {
1611 50 : LB_TS_SCOPED( _rcvThread );
1612 50 : _impl->service->withdraw(); // go silent during k/v update
1613 :
1614 97 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1615 50 : if( descs.empty( ))
1616 53 : return;
1617 :
1618 94 : std::ostringstream out;
1619 47 : out << getType();
1620 47 : _impl->service->set( "co_type", out.str( ));
1621 :
1622 47 : out.str("");
1623 47 : out << descs.size();
1624 47 : _impl->service->set( "co_numPorts", out.str( ));
1625 :
1626 94 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
1627 : {
1628 47 : ConnectionDescriptionPtr desc = *i;
1629 47 : out.str("");
1630 47 : out << "co_port" << i - descs.begin();
1631 47 : _impl->service->set( out.str(), desc->toString( ));
1632 47 : }
1633 :
1634 94 : _impl->service->announce( descs.front()->port, getNodeID().getString( ));
1635 : }
1636 :
1637 49 : void LocalNode::_exitService()
1638 : {
1639 49 : _impl->service->withdraw();
1640 49 : }
1641 :
1642 1 : Zeroconf LocalNode::getZeroconf()
1643 : {
1644 1 : lunchbox::ScopedWrite mutex( _impl->service );
1645 1 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
1646 1 : return Zeroconf( _impl->service.data );
1647 : }
1648 :
1649 :
1650 : //----------------------------------------------------------------------
1651 : // command thread functions
1652 : //----------------------------------------------------------------------
1653 50 : bool LocalNode::_startCommandThread( const int32_t threadID )
1654 : {
1655 50 : _impl->commandThread->threadID = threadID;
1656 50 : return _impl->commandThread->start();
1657 : }
1658 :
1659 51608 : bool LocalNode::_notifyCommandThreadIdle()
1660 : {
1661 51608 : return _impl->objectStore->notifyCommandThreadIdle();
1662 : }
1663 :
1664 20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
1665 : {
1666 20001 : const uint32_t requestID = command.get< uint32_t >();
1667 20001 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
1668 :
1669 20001 : serveRequest( requestID );
1670 20001 : return true;
1671 : }
1672 :
1673 49 : bool LocalNode::_cmdStopRcv( ICommand& command )
1674 : {
1675 49 : LB_TS_THREAD( _rcvThread );
1676 49 : LBASSERT( isListening( ));
1677 :
1678 49 : _exitService();
1679 49 : _setClosing(); // causes rcv thread exit
1680 :
1681 49 : command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
1682 49 : _dispatchCommand( command );
1683 49 : return true;
1684 : }
1685 :
1686 49 : bool LocalNode::_cmdStopCmd( ICommand& )
1687 : {
1688 49 : LB_TS_THREAD( _cmdThread );
1689 49 : LBASSERTINFO( isClosing(), *this );
1690 :
1691 49 : _setClosed();
1692 49 : return true;
1693 : }
1694 :
1695 0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
1696 : {
1697 0 : const int32_t affinity = command.get< int32_t >();
1698 :
1699 0 : lunchbox::Thread::setAffinity( affinity );
1700 0 : return true;
1701 : }
1702 :
1703 34 : bool LocalNode::_cmdConnect( ICommand& command )
1704 : {
1705 34 : LBASSERTINFO( !command.getRemoteNode(), command );
1706 34 : LBASSERT( _impl->inReceiverThread( ));
1707 :
1708 34 : const NodeID& nodeID = command.get< NodeID >();
1709 34 : const uint32_t requestID = command.get< uint32_t >();
1710 34 : const uint32_t nodeType = command.get< uint32_t >();
1711 34 : std::string data = command.get< std::string >();
1712 :
1713 34 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1714 34 : << nodeType << " data " << data << std::endl;
1715 :
1716 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1717 :
1718 34 : LBASSERT( connection );
1719 34 : LBASSERT( nodeID != getNodeID() );
1720 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1721 : _impl->connectionNodes.end( ));
1722 :
1723 68 : NodePtr peer;
1724 : #ifdef COLLAGE_BIGENDIAN
1725 : uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
1726 : lunchbox::byteswap( cmd );
1727 : #else
1728 34 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1729 : #endif
1730 :
1731 : // No locking needed, only recv thread modifies
1732 34 : NodeHashCIter i = _impl->nodes->find( nodeID );
1733 34 : if( i != _impl->nodes->end( ))
1734 : {
1735 0 : peer = i->second;
1736 0 : if( peer->isReachable( ))
1737 : {
1738 : // Node exists, probably simultaneous connect from peer
1739 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1740 0 : << std::endl;
1741 :
1742 : // refuse connection
1743 : OCommand( Connections( 1, connection ), cmd )
1744 0 : << NodeID() << requestID;
1745 :
1746 : // NOTE: There is no close() here. The reply command above has to be
1747 : // received by the peer first, before closing the connection.
1748 0 : _removeConnection( connection );
1749 0 : return true;
1750 : }
1751 : }
1752 :
1753 : // create and add connected node
1754 34 : if( !peer )
1755 34 : peer = createNode( nodeType );
1756 34 : if( !peer )
1757 : {
1758 0 : LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
1759 0 : << std::endl;
1760 :
1761 : // refuse connection
1762 0 : OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
1763 :
1764 : // NOTE: There is no close() here. The reply command above has to be
1765 : // received by the peer first, before closing the connection.
1766 0 : _removeConnection( connection );
1767 0 : return true;
1768 : }
1769 :
1770 34 : if( !peer->deserialize( data ))
1771 0 : LBWARN << "Error during node initialization" << std::endl;
1772 34 : LBASSERTINFO( data.empty(), data );
1773 34 : LBASSERTINFO( peer->getNodeID() == nodeID,
1774 : peer->getNodeID() << "!=" << nodeID );
1775 34 : LBASSERT( peer->getType() == nodeType );
1776 :
1777 34 : _impl->connectionNodes[ connection ] = peer;
1778 : {
1779 34 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1780 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1781 : }
1782 34 : LBVERB << "Added node " << nodeID << std::endl;
1783 :
1784 : // send our information as reply
1785 : OCommand( Connections( 1, connection ), cmd )
1786 34 : << getNodeID() << requestID << getType() << serialize();
1787 :
1788 68 : return true;
1789 : }
1790 :
1791 34 : bool LocalNode::_cmdConnectReply( ICommand& command )
1792 : {
1793 34 : LBASSERT( !command.getRemoteNode( ));
1794 34 : LBASSERT( _impl->inReceiverThread( ));
1795 :
1796 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1797 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1798 : _impl->connectionNodes.end( ));
1799 :
1800 34 : const NodeID& nodeID = command.get< NodeID >();
1801 34 : const uint32_t requestID = command.get< uint32_t >();
1802 :
1803 : // connection refused
1804 34 : if( nodeID == 0 )
1805 : {
1806 0 : LBINFO << "Connection refused, node already connected by peer"
1807 0 : << std::endl;
1808 :
1809 0 : _removeConnection( connection );
1810 0 : serveRequest( requestID, false );
1811 0 : return true;
1812 : }
1813 :
1814 34 : const uint32_t nodeType = command.get< uint32_t >();
1815 68 : std::string data = command.get< std::string >();
1816 :
1817 34 : LBVERB << "handle connect reply " << command << " req " << requestID
1818 34 : << " type " << nodeType << " data " << data << std::endl;
1819 :
1820 : // No locking needed, only recv thread modifies
1821 34 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1822 68 : NodePtr peer;
1823 34 : if( i != _impl->nodes->end( ))
1824 0 : peer = i->second;
1825 :
1826 34 : if( peer && peer->isReachable( )) // simultaneous connect
1827 : {
1828 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1829 0 : << connection << std::endl;
1830 :
1831 0 : _removeConnection( connection );
1832 0 : _closeNode( peer );
1833 0 : serveRequest( requestID, false );
1834 0 : return true;
1835 : }
1836 :
1837 : // create and add node
1838 34 : if( !peer )
1839 : {
1840 34 : if( requestID != LB_UNDEFINED_UINT32 )
1841 34 : peer = reinterpret_cast< Node* >( getRequestData( requestID ));
1842 : else
1843 0 : peer = createNode( nodeType );
1844 : }
1845 34 : if( !peer )
1846 : {
1847 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1848 0 : << std::endl;
1849 0 : _removeConnection( connection );
1850 0 : return true;
1851 : }
1852 :
1853 34 : LBASSERTINFO( peer->getType() == nodeType,
1854 : peer->getType() << " != " << nodeType );
1855 34 : LBASSERT( peer->isClosed( ));
1856 :
1857 34 : if( !peer->deserialize( data ))
1858 0 : LBWARN << "Error during node initialization" << std::endl;
1859 34 : LBASSERT( data.empty( ));
1860 34 : LBASSERT( peer->getNodeID() == nodeID );
1861 :
1862 : // send ACK to peer
1863 : // cppcheck-suppress unusedScopedObject
1864 34 : OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
1865 :
1866 34 : peer->_connect( connection );
1867 34 : _impl->connectionNodes[ connection ] = peer;
1868 : {
1869 34 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1870 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1871 : }
1872 34 : _connectMulticast( peer );
1873 34 : LBVERB << "Added node " << nodeID << std::endl;
1874 :
1875 34 : serveRequest( requestID, true );
1876 :
1877 34 : notifyConnect( peer );
1878 68 : return true;
1879 : }
1880 :
1881 34 : bool LocalNode::_cmdConnectAck( ICommand& command )
1882 : {
1883 34 : NodePtr node = command.getRemoteNode();
1884 34 : LBASSERT( node );
1885 34 : LBASSERT( _impl->inReceiverThread( ));
1886 34 : LBVERB << "handle connect ack" << std::endl;
1887 :
1888 34 : node->_connect( _impl->incoming.getConnection( ));
1889 34 : _connectMulticast( node );
1890 34 : notifyConnect( node );
1891 34 : return true;
1892 : }
1893 :
1894 0 : bool LocalNode::_cmdID( ICommand& command )
1895 : {
1896 0 : LBASSERT( _impl->inReceiverThread( ));
1897 :
1898 0 : const NodeID& nodeID = command.get< NodeID >();
1899 0 : uint32_t nodeType = command.get< uint32_t >();
1900 0 : std::string data = command.get< std::string >();
1901 :
1902 0 : if( command.getRemoteNode( ))
1903 : {
1904 0 : LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
1905 0 : LBASSERT( command.getRemoteNode()->_getMulticast( ));
1906 0 : return true;
1907 : }
1908 :
1909 0 : LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
1910 :
1911 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1912 0 : LBASSERT( connection->isMulticast( ));
1913 0 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1914 : _impl->connectionNodes.end( ));
1915 :
1916 0 : NodePtr node;
1917 0 : if( nodeID == getNodeID() ) // 'self' multicast connection
1918 0 : node = this;
1919 : else
1920 : {
1921 : // No locking needed, only recv thread writes
1922 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1923 :
1924 0 : if( i == _impl->nodes->end( ))
1925 : {
1926 : // unknown node: create and add unconnected node
1927 0 : node = createNode( nodeType );
1928 :
1929 0 : if( !node->deserialize( data ))
1930 0 : LBWARN << "Error during node initialization" << std::endl;
1931 0 : LBASSERTINFO( data.empty(), data );
1932 :
1933 : {
1934 0 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1935 0 : _impl->nodes.data[ nodeID ] = node;
1936 : }
1937 0 : LBVERB << "Added node " << nodeID << " with multicast "
1938 0 : << connection << std::endl;
1939 : }
1940 : else
1941 0 : node = i->second;
1942 : }
1943 0 : LBASSERT( node );
1944 0 : LBASSERTINFO( node->getNodeID() == nodeID,
1945 : node->getNodeID() << "!=" << nodeID );
1946 :
1947 0 : _connectMulticast( node, connection );
1948 0 : _impl->connectionNodes[ connection ] = node;
1949 0 : LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
1950 0 : << " to " << getNodeID() << std::endl;
1951 0 : return true;
1952 : }
1953 :
1954 8 : bool LocalNode::_cmdDisconnect( ICommand& command )
1955 : {
1956 8 : LBASSERT( _impl->inReceiverThread( ));
1957 :
1958 8 : const uint32_t requestID = command.get< uint32_t >();
1959 :
1960 8 : NodePtr node = static_cast<Node*>( getRequestData( requestID ));
1961 8 : LBASSERT( node );
1962 :
1963 8 : _closeNode( node );
1964 8 : LBASSERT( node->isClosed( ));
1965 8 : serveRequest( requestID );
1966 8 : return true;
1967 : }
1968 :
1969 0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
1970 : {
1971 0 : const NodeID& nodeID = command.get< NodeID >();
1972 0 : const uint32_t requestID = command.get< uint32_t >();
1973 :
1974 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1975 0 : << " nodeID " << nodeID << std::endl;
1976 :
1977 0 : NodePtr node = getNode( nodeID );
1978 0 : NodePtr toNode = command.getRemoteNode();
1979 :
1980 0 : uint32_t nodeType = NODETYPE_INVALID;
1981 0 : std::string nodeData;
1982 0 : if( node )
1983 : {
1984 0 : nodeType = node->getType();
1985 0 : nodeData = node->serialize();
1986 0 : LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
1987 0 : << toNode << std::endl;
1988 : }
1989 :
1990 : toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
1991 0 : << nodeID << requestID << nodeType << nodeData;
1992 0 : return true;
1993 : }
1994 :
1995 0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
1996 : {
1997 0 : LBASSERT( _impl->inReceiverThread( ));
1998 :
1999 0 : const NodeID& nodeID = command.get< NodeID >();
2000 0 : const uint32_t requestID = command.get< uint32_t >();
2001 0 : const uint32_t nodeType = command.get< uint32_t >();
2002 0 : std::string nodeData = command.get< std::string >();
2003 :
2004 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
2005 0 : << " type " << nodeType << " data " << nodeData << std::endl;
2006 :
2007 : // No locking needed, only recv thread writes
2008 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
2009 0 : if( i != _impl->nodes->end( ))
2010 : {
2011 : // Requested node connected to us in the meantime
2012 0 : NodePtr node = i->second;
2013 :
2014 0 : node->ref( this );
2015 0 : serveRequest( requestID, node.get( ));
2016 0 : return true;
2017 : }
2018 :
2019 0 : if( nodeType == NODETYPE_INVALID )
2020 : {
2021 0 : serveRequest( requestID, (void*)0 );
2022 0 : return true;
2023 : }
2024 :
2025 : // new node: create and add unconnected node
2026 0 : NodePtr node = createNode( nodeType );
2027 0 : if( node )
2028 : {
2029 0 : LBASSERT( node );
2030 :
2031 0 : if( !node->deserialize( nodeData ))
2032 0 : LBWARN << "Failed to initialize node data" << std::endl;
2033 0 : LBASSERT( nodeData.empty( ));
2034 0 : node->ref( this );
2035 : }
2036 : else
2037 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
2038 :
2039 0 : serveRequest( requestID, node.get( ));
2040 0 : return true;
2041 : }
2042 :
2043 0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
2044 : {
2045 0 : LBASSERT( inCommandThread( ));
2046 0 : if( !_impl->sendToken ) // enqueue command if no token available
2047 : {
2048 0 : const uint32_t timeout = Global::getTimeout();
2049 0 : if( timeout == LB_TIMEOUT_INDEFINITE ||
2050 0 : ( getTime64() - _impl->lastSendToken <= timeout ))
2051 : {
2052 0 : _impl->sendTokenQueue.push_back( command );
2053 0 : return true;
2054 : }
2055 :
2056 : // timeout! - clear old requests
2057 0 : _impl->sendTokenQueue.clear();
2058 : // 'generate' new token - release is robust
2059 : }
2060 :
2061 0 : _impl->sendToken = false;
2062 :
2063 0 : const uint32_t requestID = command.get< uint32_t >();
2064 0 : command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
2065 0 : << requestID;
2066 0 : return true;
2067 : }
2068 :
2069 0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
2070 : {
2071 0 : const uint32_t requestID = command.get< uint32_t >();
2072 0 : serveRequest( requestID );
2073 0 : return true;
2074 : }
2075 :
2076 0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
2077 : {
2078 0 : LBASSERT( inCommandThread( ));
2079 0 : _impl->lastSendToken = getTime64();
2080 :
2081 0 : if( _impl->sendToken )
2082 0 : return true; // double release due to timeout
2083 0 : if( _impl->sendTokenQueue.empty( ))
2084 : {
2085 0 : _impl->sendToken = true;
2086 0 : return true;
2087 : }
2088 :
2089 0 : ICommand& request = _impl->sendTokenQueue.front();
2090 :
2091 0 : const uint32_t requestID = request.get< uint32_t >();
2092 0 : request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
2093 0 : << requestID;
2094 0 : _impl->sendTokenQueue.pop_front();
2095 0 : return true;
2096 : }
2097 :
2098 0 : bool LocalNode::_cmdAddListener( ICommand& command )
2099 : {
2100 : Connection* rawConnection =
2101 0 : reinterpret_cast< Connection* >( command.get< uint64_t >( ));
2102 0 : std::string data = command.get< std::string >();
2103 :
2104 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2105 0 : command.getRemoteNode()->_addConnectionDescription( description );
2106 :
2107 0 : if( command.getRemoteNode() != this )
2108 0 : return true;
2109 :
2110 0 : ConnectionPtr connection = rawConnection;
2111 0 : connection->unref();
2112 0 : LBASSERT( connection );
2113 :
2114 0 : _impl->connectionNodes[ connection ] = this;
2115 0 : if( connection->isMulticast( ))
2116 0 : _addMulticast( this, connection );
2117 :
2118 0 : connection->acceptNB();
2119 0 : _impl->incoming.addConnection( connection );
2120 :
2121 0 : _initService(); // update zeroconf
2122 0 : return true;
2123 : }
2124 :
2125 0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
2126 : {
2127 0 : const uint32_t requestID = command.get< uint32_t >();
2128 0 : Connection* rawConnection = command.get< Connection* >();
2129 0 : std::string data = command.get< std::string >();
2130 :
2131 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2132 0 : LBCHECK(
2133 : command.getRemoteNode()->_removeConnectionDescription( description ));
2134 :
2135 0 : if( command.getRemoteNode() != this )
2136 0 : return true;
2137 :
2138 0 : _initService(); // update zeroconf
2139 :
2140 0 : ConnectionPtr connection = rawConnection;
2141 0 : connection->unref( this );
2142 0 : LBASSERT( connection );
2143 :
2144 0 : if( connection->isMulticast( ))
2145 0 : _removeMulticast( connection );
2146 :
2147 0 : _impl->incoming.removeConnection( connection );
2148 0 : LBASSERT( _impl->connectionNodes.find( connection ) !=
2149 : _impl->connectionNodes.end( ));
2150 0 : _impl->connectionNodes.erase( connection );
2151 0 : serveRequest( requestID );
2152 0 : return true;
2153 : }
2154 :
2155 0 : bool LocalNode::_cmdPing( ICommand& command )
2156 : {
2157 0 : LBASSERT( inCommandThread( ));
2158 0 : command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
2159 0 : return true;
2160 : }
2161 :
2162 2 : bool LocalNode::_cmdCommand( ICommand& command )
2163 : {
2164 2 : const uint128_t& commandID = command.get< uint128_t >();
2165 2 : CommandHandler func;
2166 : {
2167 2 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2168 2 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2169 2 : if( i == _impl->commandHandlers->end( ))
2170 0 : return false;
2171 :
2172 2 : CommandQueue* queue = i->second.second;
2173 2 : if( queue )
2174 : {
2175 : command.setDispatchFunction( CmdFunc( this,
2176 1 : &LocalNode::_cmdCommandAsync ));
2177 1 : queue->push( command );
2178 1 : return true;
2179 : }
2180 : // else
2181 :
2182 1 : func = i->second.first;
2183 : }
2184 :
2185 2 : CustomICommand customCmd( command );
2186 3 : return func( customCmd );
2187 : }
2188 :
2189 1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
2190 : {
2191 1 : const uint128_t& commandID = command.get< uint128_t >();
2192 1 : CommandHandler func;
2193 : {
2194 1 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2195 1 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2196 1 : LBASSERT( i != _impl->commandHandlers->end( ));
2197 1 : if( i == _impl->commandHandlers->end( ))
2198 0 : return true; // deregistered between dispatch and now
2199 1 : func = i->second.first;
2200 : }
2201 2 : CustomICommand customCmd( command );
2202 2 : return func( customCmd );
2203 : }
2204 :
2205 34 : bool LocalNode::_cmdAddConnection( ICommand& command )
2206 : {
2207 34 : LBASSERT( _impl->inReceiverThread( ));
2208 :
2209 34 : ConnectionPtr connection = command.get< ConnectionPtr >();
2210 34 : _addConnection( connection );
2211 34 : connection->unref(); // ref'd by _addConnection
2212 34 : return true;
2213 : }
2214 :
2215 66 : }
|