Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/futureFunction.h>
46 : #include <lunchbox/hash.h>
47 : #include <lunchbox/lockable.h>
48 : #include <lunchbox/request.h>
49 : #include <lunchbox/rng.h>
50 : #include <lunchbox/servus.h>
51 : #include <lunchbox/sleep.h>
52 :
53 : #include <boost/bind.hpp>
54 : #include <boost/date_time/posix_time/posix_time.hpp>
55 : #include <boost/lexical_cast.hpp>
56 :
57 : #include <list>
58 :
59 : namespace bp = boost::posix_time;
60 :
61 : namespace co
62 : {
63 : namespace
64 : {
65 22 : lunchbox::a_int32_t _threadIDs;
66 :
67 : typedef CommandFunc< LocalNode > CmdFunc;
68 : typedef std::list< ICommand > CommandList;
69 : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
70 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
71 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
72 : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
73 : typedef NodeHash::const_iterator NodeHashCIter;
74 : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
75 : typedef HandlerHash::const_iterator HandlerHashCIter;
76 : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
77 : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
78 : typedef CommandHash::const_iterator CommandHashCIter;
79 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
80 : }
81 :
82 : namespace detail
83 : {
84 106 : class ReceiverThread : public lunchbox::Thread
85 : {
86 : public:
87 55 : explicit ReceiverThread( co::LocalNode* localNode )
88 55 : : _localNode( localNode ) {}
89 :
90 50 : bool init() override
91 : {
92 50 : const int32_t threadID = ++_threadIDs - 1;
93 100 : setName( std::string( "Rcv" ) +
94 50 : boost::lexical_cast< std::string >( threadID ));
95 50 : return _localNode->_startCommandThread( threadID );
96 : }
97 :
98 50 : void run() override { _localNode->_runReceiverThread(); }
99 :
100 : private:
101 : co::LocalNode* const _localNode;
102 : };
103 :
104 106 : class CommandThread : public Worker
105 : {
106 : public:
107 55 : explicit CommandThread( co::LocalNode* localNode )
108 : : Worker( Global::getCommandQueueLimit( ))
109 : , threadID( 0 )
110 55 : , _localNode( localNode )
111 55 : {}
112 :
113 : int32_t threadID;
114 :
115 : protected:
116 50 : bool init() override
117 : {
118 100 : setName( std::string( "Cmd" ) +
119 50 : boost::lexical_cast< std::string >( threadID ));
120 50 : return true;
121 : }
122 :
123 103335 : bool stopRunning() override { return _localNode->isClosed(); }
124 51606 : bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
125 :
126 : private:
127 : co::LocalNode* const _localNode;
128 : };
129 :
130 : class LocalNode
131 : {
132 : public:
133 55 : LocalNode()
134 : : smallBuffers( 200 )
135 : , bigBuffers( 20 )
136 : , sendToken( true )
137 : , lastSendToken( 0 )
138 : , objectStore( 0 )
139 : , receiverThread( 0 )
140 : , commandThread( 0 )
141 55 : , service( "_collage._tcp" )
142 55 : {}
143 :
144 53 : ~LocalNode()
145 53 : {
146 53 : LBASSERT( incoming.isEmpty( ));
147 53 : LBASSERT( connectionNodes.empty( ));
148 53 : LBASSERT( pendingCommands.empty( ));
149 53 : LBASSERT( nodes->empty( ));
150 :
151 53 : delete objectStore;
152 53 : objectStore = 0;
153 53 : LBASSERT( !commandThread->isRunning( ));
154 53 : delete commandThread;
155 53 : commandThread = 0;
156 :
157 53 : LBASSERT( !receiverThread->isRunning( ));
158 53 : delete receiverThread;
159 53 : receiverThread = 0;
160 53 : }
161 :
162 280 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
163 :
164 : /** Commands re-scheduled for dispatch. */
165 : CommandList pendingCommands;
166 :
167 : /** The command buffer 'allocator' for small packets */
168 : co::BufferCache smallBuffers;
169 :
170 : /** The command buffer 'allocator' for big packets */
171 : co::BufferCache bigBuffers;
172 :
173 : bool sendToken; //!< send token availability.
174 : uint64_t lastSendToken; //!< last used time for timeout detection
175 : std::deque< co::ICommand > sendTokenQueue; //!< pending requests
176 :
177 : /** Manager of distributed object */
178 : ObjectStore* objectStore;
179 :
180 : /** Needed for thread-safety during nodeID-based connect() */
181 : lunchbox::Lock connectLock;
182 :
183 : /** The node for each connection. */
184 : ConnectionNodeHash connectionNodes; // read and write: recv only
185 :
186 : /** The connected nodes. */
187 : lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
188 :
189 : /** The connection set of all connections from/to this node. */
190 : co::ConnectionSet incoming;
191 :
192 : /** The process-global clock. */
193 : lunchbox::Clock clock;
194 :
195 : /** The registered push handlers. */
196 : lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
197 :
198 : /** The registered custom command handlers. */
199 : lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
200 :
201 : ReceiverThread* receiverThread;
202 : CommandThread* commandThread;
203 :
204 : lunchbox::Lockable< servus::Servus > service;
205 :
206 : // Performance counters:
207 : a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
208 : };
209 : }
210 :
211 55 : LocalNode::LocalNode( const uint32_t type )
212 : : Node( type )
213 55 : , _impl( new detail::LocalNode )
214 : {
215 55 : _impl->receiverThread = new detail::ReceiverThread( this );
216 55 : _impl->commandThread = new detail::CommandThread( this );
217 55 : _impl->objectStore = new ObjectStore( this, _impl->counters );
218 :
219 55 : CommandQueue* queue = getCommandThreadQueue();
220 : registerCommand( CMD_NODE_CONNECT,
221 55 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
222 : registerCommand( CMD_NODE_CONNECT_BE,
223 55 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
224 : registerCommand( CMD_NODE_CONNECT_REPLY,
225 55 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
226 : registerCommand( CMD_NODE_CONNECT_REPLY_BE,
227 55 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
228 : registerCommand( CMD_NODE_ID,
229 55 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
230 : registerCommand( CMD_NODE_ID_BE,
231 55 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
232 : registerCommand( CMD_NODE_ACK_REQUEST,
233 55 : CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
234 : registerCommand( CMD_NODE_STOP_RCV,
235 55 : CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
236 : registerCommand( CMD_NODE_STOP_CMD,
237 55 : CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
238 : registerCommand( CMD_NODE_SET_AFFINITY_RCV,
239 55 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
240 : registerCommand( CMD_NODE_SET_AFFINITY_CMD,
241 55 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
242 : registerCommand( CMD_NODE_CONNECT_ACK,
243 55 : CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
244 : registerCommand( CMD_NODE_DISCONNECT,
245 55 : CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
246 : registerCommand( CMD_NODE_GET_NODE_DATA,
247 55 : CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
248 : registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
249 55 : CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
250 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
251 55 : CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
252 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
253 55 : CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
254 : registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
255 55 : CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
256 : registerCommand( CMD_NODE_ADD_LISTENER,
257 55 : CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
258 : registerCommand( CMD_NODE_REMOVE_LISTENER,
259 55 : CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
260 : registerCommand( CMD_NODE_PING,
261 55 : CmdFunc( this, &LocalNode::_cmdPing ), queue );
262 : registerCommand( CMD_NODE_PING_REPLY,
263 55 : CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
264 : registerCommand( CMD_NODE_COMMAND,
265 55 : CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
266 : registerCommand( CMD_NODE_ADD_CONNECTION,
267 55 : CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
268 55 : }
269 :
270 148 : LocalNode::~LocalNode( )
271 : {
272 53 : LBASSERT( !hasPendingRequests( ));
273 53 : delete _impl;
274 95 : }
275 :
276 1 : bool LocalNode::initLocal( const int argc, char** argv )
277 : {
278 : // We do not use getopt_long because it really does not work due to the
279 : // following aspects:
280 : // - reordering of arguments
281 : // - different behavior of GNU and BSD implementations
282 : // - incomplete man pages
283 1 : for( int i=1; i<argc; ++i )
284 : {
285 0 : if( std::string( "--eq-listen" ) == argv[i] )
286 0 : LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
287 0 : if( std::string( "--eq-listen" ) == argv[i] ||
288 0 : std::string( "--co-listen" ) == argv[i] )
289 : {
290 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
291 : {
292 0 : std::string data = argv[++i];
293 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
294 0 : desc->port = Global::getDefaultPort();
295 :
296 0 : if( desc->fromString( data ))
297 : {
298 0 : addConnectionDescription( desc );
299 0 : LBASSERTINFO( data.empty(), data );
300 : }
301 : else
302 0 : LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
303 : }
304 : else
305 : {
306 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
307 : }
308 : }
309 0 : else if ( std::string( "--co-globals" ) == argv[i] )
310 : {
311 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
312 : {
313 0 : const std::string data = argv[++i];
314 0 : if( !Global::fromString( data ))
315 : {
316 0 : LBWARN << "Invalid global variables string: " << data
317 0 : << ", using default global variables." << std::endl;
318 0 : }
319 : }
320 : else
321 : {
322 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
323 : }
324 : }
325 : }
326 :
327 1 : if( !listen( ))
328 : {
329 0 : LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
330 0 : << std::endl;
331 0 : return false;
332 : }
333 1 : return true;
334 : }
335 :
336 50 : bool LocalNode::listen()
337 : {
338 50 : LBVERB << "Listener data: " << serialize() << std::endl;
339 50 : if( !isClosed() || !_connectSelf( ))
340 0 : return false;
341 :
342 50 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
343 291 : for( ConnectionDescriptionsCIter i = descriptions.begin();
344 194 : i != descriptions.end(); ++i )
345 : {
346 47 : ConnectionDescriptionPtr description = *i;
347 94 : ConnectionPtr connection = Connection::create( description );
348 :
349 47 : if( !connection || !connection->listen( ))
350 : {
351 0 : LBWARN << "Can't create listener connection: " << description
352 0 : << std::endl;
353 0 : return false;
354 : }
355 :
356 47 : _impl->connectionNodes[ connection ] = this;
357 47 : if( connection->isMulticast( ))
358 0 : _addMulticast( this, connection );
359 :
360 47 : connection->acceptNB();
361 47 : _impl->incoming.addConnection( connection );
362 :
363 94 : LBVERB << "Added node " << getNodeID() << " using " << connection
364 141 : << std::endl;
365 47 : }
366 :
367 100 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
368 150 : << std::endl;
369 :
370 50 : _setListening();
371 50 : _impl->receiverThread->start();
372 :
373 50 : LBDEBUG << *this << std::endl;
374 50 : return true;
375 : }
376 :
377 49 : bool LocalNode::close()
378 : {
379 49 : if( !isListening() )
380 0 : return false;
381 :
382 49 : send( CMD_NODE_STOP_RCV );
383 :
384 49 : LBCHECK( _impl->receiverThread->join( ));
385 49 : _cleanup();
386 :
387 147 : LBDEBUG << _impl->incoming.getSize() << " connections open after close"
388 147 : << std::endl;
389 : #ifndef NDEBUG
390 49 : const Connections& connections = _impl->incoming.getConnections();
391 147 : for( Connections::const_iterator i = connections.begin();
392 98 : i != connections.end(); ++i )
393 : {
394 0 : LBDEBUG << " " << *i << std::endl;
395 : }
396 : #endif
397 :
398 49 : LBASSERTINFO( !hasPendingRequests(),
399 : *static_cast< lunchbox::RequestHandler* >( this ));
400 49 : return true;
401 : }
402 :
403 0 : void LocalNode::setAffinity( const int32_t affinity )
404 : {
405 0 : send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
406 0 : send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
407 :
408 0 : lunchbox::Thread::setAffinity( affinity );
409 0 : }
410 :
411 34 : void LocalNode::addConnection( ConnectionPtr connection )
412 : {
413 34 : connection->ref(); // unref in _cmdAddConnection
414 34 : send( CMD_NODE_ADD_CONNECTION ) << connection;
415 34 : }
416 :
417 0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
418 : {
419 0 : LBASSERT( isListening( ));
420 :
421 0 : ConnectionPtr connection = Connection::create( desc );
422 0 : if( connection && connection->listen( ))
423 : {
424 0 : addListener( connection );
425 0 : return connection;
426 : }
427 :
428 0 : return 0;
429 : }
430 :
431 0 : void LocalNode::addListener( ConnectionPtr connection )
432 : {
433 0 : LBASSERT( isListening( ));
434 0 : LBASSERT( connection->isListening( ));
435 0 : if( !isListening() || !connection->isListening( ))
436 0 : return;
437 :
438 0 : connection->ref( this ); // unref in self handler
439 :
440 : // Update everybody's description list of me, add the listener to myself in
441 : // my handler
442 0 : Nodes nodes;
443 0 : getNodes( nodes );
444 :
445 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
446 0 : (*i)->send( CMD_NODE_ADD_LISTENER )
447 0 : << (uint64_t)(connection.get( ))
448 0 : << connection->getDescription()->toString();
449 : }
450 :
451 0 : void LocalNode::removeListeners( const Connections& connections )
452 : {
453 0 : std::vector< lunchbox::Request< void > > requests;
454 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
455 : {
456 0 : ConnectionPtr connection = *i;
457 0 : requests.push_back( _removeListener( connection ));
458 0 : }
459 0 : }
460 :
461 0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
462 : {
463 0 : LBASSERT( isListening( ));
464 0 : LBASSERTINFO( !conn->isConnected(), conn );
465 :
466 0 : conn->ref( this );
467 0 : const lunchbox::Request< void > request = registerRequest< void >();
468 0 : Nodes nodes;
469 0 : getNodes( nodes );
470 :
471 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
472 0 : (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
473 0 : << conn->getDescription()->toString();
474 0 : return request;
475 : }
476 :
477 152 : void LocalNode::_addConnection( ConnectionPtr connection )
478 : {
479 152 : if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
480 : {
481 34 : addConnection( connection );
482 186 : return;
483 : }
484 :
485 118 : BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
486 118 : connection->recvNB( buffer, COMMAND_MINSIZE );
487 118 : _impl->incoming.addConnection( connection );
488 : }
489 :
490 224 : void LocalNode::_removeConnection( ConnectionPtr connection )
491 : {
492 224 : LBASSERT( connection );
493 :
494 224 : _impl->incoming.removeConnection( connection );
495 224 : connection->resetRecvData();
496 224 : if( !connection->isClosed( ))
497 130 : connection->close(); // cancel pending IO's
498 224 : }
499 :
500 49 : void LocalNode::_cleanup()
501 : {
502 49 : LBVERB << "Clean up stopped node" << std::endl;
503 49 : LBASSERTINFO( isClosed(), *this );
504 :
505 49 : if( !_impl->connectionNodes.empty( ))
506 141 : LBDEBUG << _impl->connectionNodes.size()
507 141 : << " open connections during cleanup" << std::endl;
508 : #ifndef NDEBUG
509 288 : for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
510 192 : i != _impl->connectionNodes.end(); ++i )
511 : {
512 47 : NodePtr node = i->second;
513 47 : LBDEBUG << " " << i->first << " : " << node << std::endl;
514 47 : }
515 : #endif
516 :
517 49 : _impl->connectionNodes.clear();
518 :
519 49 : if( !_impl->nodes->empty( ))
520 6 : LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
521 6 : << std::endl;
522 :
523 : #ifndef NDEBUG
524 51 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
525 : {
526 2 : NodePtr node = i->second;
527 2 : LBDEBUG << " " << node << std::endl;
528 2 : }
529 : #endif
530 :
531 49 : _impl->nodes->clear();
532 49 : }
533 :
534 115 : void LocalNode::_closeNode( NodePtr node )
535 : {
536 115 : ConnectionPtr connection = node->getConnection();
537 230 : ConnectionPtr mcConnection = node->_getMulticast();
538 :
539 115 : node->_disconnect();
540 :
541 115 : if( connection )
542 : {
543 68 : LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
544 : _impl->connectionNodes.end(), connection );
545 :
546 68 : _removeConnection( connection );
547 68 : _impl->connectionNodes.erase( connection );
548 : }
549 :
550 115 : if( mcConnection )
551 : {
552 0 : _removeConnection( mcConnection );
553 0 : _impl->connectionNodes.erase( mcConnection );
554 : }
555 :
556 115 : _impl->objectStore->removeInstanceData( node->getNodeID( ));
557 :
558 230 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
559 115 : _impl->nodes->erase( node->getNodeID( ));
560 115 : notifyDisconnect( node );
561 230 : LBDEBUG << node << " disconnected from " << *this << std::endl;
562 115 : }
563 :
564 50 : bool LocalNode::_connectSelf()
565 : {
566 : // setup local connection to myself
567 50 : PipeConnectionPtr connection = new PipeConnection;
568 50 : if( !connection->connect( ))
569 : {
570 0 : LBERROR << "Could not create local connection to receiver thread."
571 0 : << std::endl;
572 0 : return false;
573 : }
574 :
575 50 : Node::_connect( connection->getSibling( ));
576 50 : _setClosed(); // reset state after _connect set it to connected
577 :
578 : // add to connection set
579 50 : LBASSERT( connection->getDescription( ));
580 50 : LBASSERT( _impl->connectionNodes.find( connection ) ==
581 : _impl->connectionNodes.end( ));
582 :
583 50 : _impl->connectionNodes[ connection ] = this;
584 50 : _impl->nodes.data[ getNodeID() ] = this;
585 50 : _addConnection( connection );
586 :
587 100 : LBVERB << "Added node " << getNodeID() << " using " << connection
588 150 : << std::endl;
589 50 : return true;
590 : }
591 :
592 8 : bool LocalNode::disconnect( NodePtr node )
593 : {
594 8 : if( !node || !isListening() )
595 0 : return false;
596 :
597 8 : if( !node->isConnected( ))
598 0 : return true;
599 :
600 8 : LBASSERT( !inCommandThread( ));
601 8 : lunchbox::Request< void > request = registerRequest< void >( node.get( ));
602 8 : send( CMD_NODE_DISCONNECT ) << request;
603 :
604 8 : request.wait();
605 8 : _impl->objectStore->removeNode( node );
606 8 : return true;
607 : }
608 :
609 20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
610 : {
611 20001 : if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
612 20001 : return;
613 :
614 20001 : if( node == this ) // OPT
615 0 : serveRequest( requestID );
616 : else
617 20001 : node->send( CMD_NODE_ACK_REQUEST ) << requestID;
618 : }
619 :
620 0 : void LocalNode::ping( NodePtr peer )
621 : {
622 0 : LBASSERT( !_impl->inReceiverThread( ));
623 0 : peer->send( CMD_NODE_PING );
624 0 : }
625 :
626 0 : bool LocalNode::pingIdleNodes()
627 : {
628 0 : LBASSERT( !_impl->inReceiverThread( ) );
629 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
630 0 : Nodes nodes;
631 0 : getNodes( nodes, false );
632 :
633 0 : bool pinged = false;
634 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
635 : {
636 0 : NodePtr node = *i;
637 0 : if( getTime64() - node->getLastReceiveTime() > timeout )
638 : {
639 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
640 0 : << node->getLastReceiveTime() << std::endl;
641 0 : node->send( CMD_NODE_PING );
642 0 : pinged = true;
643 : }
644 0 : }
645 0 : return pinged;
646 : }
647 :
648 : //----------------------------------------------------------------------
649 : // Object functionality
650 : //----------------------------------------------------------------------
651 0 : void LocalNode::disableInstanceCache()
652 : {
653 0 : _impl->objectStore->disableInstanceCache();
654 0 : }
655 :
656 0 : void LocalNode::expireInstanceData( const int64_t age )
657 : {
658 0 : _impl->objectStore->expireInstanceData( age );
659 0 : }
660 :
661 0 : void LocalNode::enableSendOnRegister()
662 : {
663 0 : _impl->objectStore->enableSendOnRegister();
664 0 : }
665 :
666 0 : void LocalNode::disableSendOnRegister()
667 : {
668 0 : _impl->objectStore->disableSendOnRegister();
669 0 : }
670 :
671 21 : bool LocalNode::registerObject( Object* object )
672 : {
673 21 : return _impl->objectStore->register_( object );
674 : }
675 :
676 21 : void LocalNode::deregisterObject( Object* object )
677 : {
678 21 : _impl->objectStore->deregister( object );
679 21 : }
680 :
681 40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
682 : NodePtr master, const uint128_t& version )
683 : {
684 : const uint32_t request = _impl->objectStore->mapNB( object, id, version,
685 40 : master );
686 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
687 40 : _impl->objectStore, request );
688 40 : return f_bool_t( new FuturebImpl( func ));
689 : }
690 :
691 0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
692 : const uint128_t& version )
693 : {
694 0 : return _impl->objectStore->mapNB( object, id, version, 0 );
695 : }
696 :
697 2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
698 : const uint128_t& version, NodePtr master )
699 : {
700 2 : return _impl->objectStore->mapNB( object, id, version, master );
701 : }
702 :
703 :
704 2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
705 : {
706 2 : return _impl->objectStore->mapSync( requestID );
707 : }
708 :
709 4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
710 : const uint32_t instanceID )
711 : {
712 4 : return _impl->objectStore->sync( object, master, id, instanceID );
713 : }
714 :
715 42 : void LocalNode::unmapObject( Object* object )
716 : {
717 42 : _impl->objectStore->unmap( object );
718 42 : }
719 :
720 0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
721 : {
722 0 : _impl->objectStore->swap( oldObject, newObject );
723 0 : }
724 :
725 0 : void LocalNode::objectPush( const uint128_t& groupID,
726 : const uint128_t& objectType,
727 : const uint128_t& objectID, DataIStream& istream )
728 : {
729 0 : lunchbox::ScopedRead mutex( _impl->pushHandlers );
730 0 : HandlerHashCIter i = _impl->pushHandlers->find( groupID );
731 0 : if( i != _impl->pushHandlers->end( ))
732 0 : i->second( groupID, objectType, objectID, istream );
733 : else
734 0 : LBWARN << "No custom handler for push group " << groupID
735 0 : << " registered" << std::endl;
736 :
737 0 : if( istream.wasUsed() && istream.hasData( ))
738 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
739 0 : << objectType << " object " << objectID << std::endl;
740 0 : }
741 :
742 0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
743 : const PushHandler& handler )
744 : {
745 0 : lunchbox::ScopedWrite mutex( _impl->pushHandlers );
746 0 : (*_impl->pushHandlers)[ groupID ] = handler;
747 0 : }
748 :
749 2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
750 : const CommandHandler& func,
751 : CommandQueue* queue )
752 : {
753 2 : lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
754 2 : if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
755 : {
756 0 : LBWARN << "Already got a registered handler for custom command "
757 0 : << command << std::endl;
758 0 : return false;
759 : }
760 :
761 : _impl->commandHandlers->insert( std::make_pair( command,
762 2 : std::make_pair( func, queue )));
763 2 : return true;
764 : }
765 :
766 0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
767 : {
768 0 : LBASSERT( !inCommandThread( ));
769 0 : LBASSERT( !_impl->inReceiverThread( ));
770 :
771 0 : lunchbox::Request< void > request = registerRequest< void >();
772 0 : node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
773 :
774 : try
775 : {
776 0 : request.wait( Global::getTimeout( ));
777 : }
778 0 : catch( const lunchbox::FutureTimeout& )
779 : {
780 0 : LBERROR << "Timeout while acquiring send token " << request.getID()
781 0 : << std::endl;
782 0 : request.unregister();
783 0 : return 0;
784 : }
785 0 : return new co::SendToken( node );
786 : }
787 :
788 0 : void LocalNode::releaseSendToken( SendToken token )
789 : {
790 0 : LBASSERT( !_impl->inReceiverThread( ));
791 0 : if( token )
792 0 : token->release();
793 0 : }
794 :
795 : //----------------------------------------------------------------------
796 : // Connecting a node
797 : //----------------------------------------------------------------------
798 : namespace
799 : {
800 : enum ConnectResult
801 : {
802 : CONNECT_OK,
803 : CONNECT_TRY_AGAIN,
804 : CONNECT_BAD_STATE,
805 : CONNECT_TIMEOUT,
806 : CONNECT_UNREACHABLE
807 : };
808 : }
809 :
810 73 : NodePtr LocalNode::connect( const NodeID& nodeID )
811 : {
812 73 : LBASSERT( nodeID != 0 );
813 73 : LBASSERT( isListening( ));
814 :
815 : // Make sure that only one connection request based on the node identifier
816 : // is pending at a given time. Otherwise a node with the same id might be
817 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
818 : // mutex is to register connecting nodes with this local node, and handle
819 : // all cases correctly, which is far more complex. Node connections only
820 : // happen a lot during initialization, and are therefore not time-critical.
821 73 : lunchbox::ScopedWrite mutex( _impl->connectLock );
822 :
823 146 : Nodes nodes;
824 73 : getNodes( nodes );
825 :
826 76 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
827 : {
828 76 : NodePtr peer = *i;
829 76 : if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
830 73 : return peer;
831 3 : }
832 :
833 0 : LBDEBUG << "Connecting node " << nodeID << std::endl;
834 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
835 : {
836 0 : NodePtr peer = *i;
837 0 : NodePtr node = _connect( nodeID, peer );
838 0 : if( node )
839 0 : return node;
840 0 : }
841 :
842 0 : NodePtr node = _connectFromZeroconf( nodeID );
843 0 : if( node )
844 0 : return node;
845 :
846 : // check again if node connected by itself by now
847 0 : nodes.clear();
848 0 : getNodes( nodes );
849 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
850 : {
851 0 : node = *i;
852 0 : if( node->getNodeID() == nodeID && node->isReachable( ))
853 0 : return node;
854 : }
855 :
856 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
857 73 : return 0;
858 : }
859 :
860 0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
861 : {
862 0 : LBASSERT( nodeID != 0 );
863 :
864 0 : NodePtr node;
865 : {
866 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
867 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
868 0 : if( i != _impl->nodes->end( ))
869 0 : node = i->second;
870 : }
871 :
872 0 : LBASSERT( getNodeID() != nodeID );
873 0 : if( !node )
874 : {
875 0 : lunchbox::Request< void* > request = registerRequest< void* >();
876 0 : peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
877 0 : node = reinterpret_cast< Node* >( request.wait( ));
878 0 : if( !node )
879 : {
880 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
881 0 : << std::endl;
882 0 : return 0;
883 : }
884 0 : node->unref( this ); // ref'd before serveRequest()
885 : }
886 :
887 0 : if( node->isReachable( ))
888 0 : return node;
889 :
890 0 : size_t tries = 10;
891 0 : while( --tries )
892 : {
893 0 : switch( _connect( node ))
894 : {
895 : case CONNECT_OK:
896 0 : return node;
897 : case CONNECT_TRY_AGAIN:
898 : {
899 0 : lunchbox::RNG rng;
900 : // collision avoidance
901 0 : lunchbox::sleep( rng.get< uint8_t >( ));
902 0 : break;
903 : }
904 : case CONNECT_BAD_STATE:
905 0 : LBWARN << "Internal connect error" << std::endl;
906 : // no break;
907 : case CONNECT_TIMEOUT:
908 0 : return 0;
909 :
910 : case CONNECT_UNREACHABLE:
911 0 : break; // maybe peer talks to us
912 : }
913 :
914 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
915 : // connect failed - check for simultaneous connect from peer
916 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
917 0 : if( i != _impl->nodes->end( ))
918 0 : node = i->second;
919 0 : }
920 :
921 0 : return node->isReachable() ? node : 0;
922 : }
923 :
924 0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
925 : {
926 0 : lunchbox::ScopedWrite mutex( _impl->service );
927 :
928 : const Strings& instances =
929 0 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
930 0 : for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
931 : {
932 0 : const std::string& instance = *i;
933 0 : const NodeID candidate( instance );
934 0 : if( candidate != nodeID )
935 0 : continue;
936 :
937 0 : const std::string& typeStr = _impl->service->get( instance, "co_type" );
938 0 : if( typeStr.empty( ))
939 0 : return 0;
940 :
941 0 : std::istringstream in( typeStr );
942 0 : uint32_t type = 0;
943 0 : in >> type;
944 :
945 0 : NodePtr node = createNode( type );
946 0 : if( !node )
947 : {
948 0 : LBINFO << "Can't create node of type " << type << std::endl;
949 0 : continue;
950 : }
951 :
952 : const std::string& numStr = _impl->service->get( instance,
953 0 : "co_numPorts" );
954 0 : uint32_t num = 0;
955 :
956 0 : in.clear();
957 0 : in.str( numStr );
958 0 : in >> num;
959 0 : LBASSERT( num > 0 );
960 0 : for( size_t j = 0; j < num; ++j )
961 : {
962 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
963 0 : std::ostringstream out;
964 0 : out << "co_port" << j;
965 :
966 0 : std::string descStr = _impl->service->get( instance, out.str( ));
967 0 : LBASSERT( !descStr.empty( ));
968 0 : LBCHECK( desc->fromString( descStr ));
969 0 : LBASSERT( descStr.empty( ));
970 0 : node->addConnectionDescription( desc );
971 0 : }
972 0 : mutex.leave();
973 0 : if( _connect( node ))
974 0 : return node;
975 0 : }
976 0 : return 0;
977 : }
978 :
979 34 : bool LocalNode::connect( NodePtr node )
980 : {
981 34 : lunchbox::ScopedWrite mutex( _impl->connectLock );
982 34 : return ( _connect( node ) == CONNECT_OK );
983 : }
984 :
985 34 : uint32_t LocalNode::_connect( NodePtr node )
986 : {
987 34 : LBASSERTINFO( isListening(), *this );
988 34 : if( node->isReachable( ))
989 0 : return CONNECT_OK;
990 :
991 34 : LBASSERT( node->isClosed( ));
992 34 : LBDEBUG << "Connecting " << node << std::endl;
993 :
994 : // try connecting using the given descriptions
995 34 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
996 102 : for( ConnectionDescriptionsCIter i = cds.begin();
997 68 : i != cds.end(); ++i )
998 : {
999 34 : ConnectionDescriptionPtr description = *i;
1000 34 : if( description->type >= CONNECTIONTYPE_MULTICAST )
1001 0 : continue; // Don't use multicast for primary connections
1002 :
1003 34 : ConnectionPtr connection = Connection::create( description );
1004 34 : if( !connection || !connection->connect( ))
1005 0 : continue;
1006 :
1007 34 : return _connect( node, connection );
1008 0 : }
1009 :
1010 0 : LBDEBUG << "Node " << node
1011 0 : << " unreachable, all connections failed to connect" <<std::endl;
1012 0 : return CONNECT_UNREACHABLE;
1013 : }
1014 :
1015 0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
1016 : {
1017 0 : return ( _connect( node, connection ) == CONNECT_OK );
1018 : }
1019 :
1020 34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
1021 : {
1022 34 : LBASSERT( connection );
1023 34 : LBASSERT( node->getNodeID() != getNodeID( ));
1024 :
1025 68 : if( !node || !isListening() || !connection->isConnected() ||
1026 34 : !node->isClosed( ))
1027 : {
1028 0 : return CONNECT_BAD_STATE;
1029 : }
1030 :
1031 34 : _addConnection( connection );
1032 :
1033 : // send connect command to peer
1034 34 : lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
1035 : #ifdef COLLAGE_BIGENDIAN
1036 : uint32_t cmd = CMD_NODE_CONNECT_BE;
1037 : lunchbox::byteswap( cmd );
1038 : #else
1039 34 : const uint32_t cmd = CMD_NODE_CONNECT;
1040 : #endif
1041 : OCommand( Connections( 1, connection ), cmd )
1042 34 : << getNodeID() << request << getType() << serialize();
1043 :
1044 34 : bool connected = false;
1045 : try
1046 : {
1047 34 : connected = request.wait( 10000 /*ms*/ );
1048 : }
1049 0 : catch( const lunchbox::FutureTimeout& )
1050 : {
1051 0 : LBWARN << "Node connection handshake timeout - " << node
1052 0 : << " not a Collage node?" << std::endl;
1053 0 : request.unregister();
1054 0 : return CONNECT_TIMEOUT;
1055 : }
1056 :
1057 : // In simultaneous connect case, depending on the connection type
1058 : // (e.g. RDMA), a check on the connection state of the node is required
1059 34 : if( !connected || !node->isConnected( ))
1060 0 : return CONNECT_TRY_AGAIN;
1061 :
1062 34 : LBASSERT( node->getNodeID() != 0 );
1063 34 : LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
1064 34 : LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
1065 34 : return CONNECT_OK;
1066 : }
1067 :
1068 40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
1069 : {
1070 40 : LBASSERTINFO( id.isUUID(), id );
1071 40 : if( !id.isUUID( ))
1072 : {
1073 0 : LBWARN << "Invalid object id " << id << std::endl;
1074 0 : return 0;
1075 : }
1076 :
1077 40 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
1078 40 : if( masterNodeID == 0 )
1079 : {
1080 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1081 0 : return 0;
1082 : }
1083 :
1084 40 : NodePtr master = connect( masterNodeID );
1085 40 : if( master && !master->isClosed( ))
1086 40 : return master;
1087 :
1088 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1089 0 : << " for object " << id << std::endl;
1090 0 : return 0;
1091 : }
1092 :
1093 34 : NodePtr LocalNode::createNode( const uint32_t type )
1094 : {
1095 34 : LBASSERTINFO( type == NODETYPE_NODE, type );
1096 34 : return new Node( type );
1097 : }
1098 :
1099 0 : NodePtr LocalNode::getNode( const NodeID& id ) const
1100 : {
1101 0 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1102 0 : NodeHash::const_iterator i = _impl->nodes->find( id );
1103 0 : if( i == _impl->nodes->end() || !i->second->isReachable( ))
1104 0 : return 0;
1105 0 : return i->second;
1106 : }
1107 :
1108 113 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
1109 : {
1110 113 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1111 332 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1112 : {
1113 219 : NodePtr node = i->second;
1114 219 : if( node->isReachable() && ( addSelf || node != this ))
1115 219 : nodes.push_back( i->second );
1116 332 : }
1117 113 : }
1118 :
1119 146 : CommandQueue* LocalNode::getCommandThreadQueue()
1120 : {
1121 146 : return _impl->commandThread->getWorkerQueue();
1122 : }
1123 :
1124 8 : bool LocalNode::inCommandThread() const
1125 : {
1126 8 : return _impl->commandThread->isCurrent();
1127 : }
1128 :
1129 72429 : int64_t LocalNode::getTime64() const
1130 : {
1131 72429 : return _impl->clock.getTime64();
1132 : }
1133 :
1134 0 : ssize_t LocalNode::getCounter( const Counter counter ) const
1135 : {
1136 0 : return _impl->counters[ counter ];
1137 : }
1138 :
1139 196 : void LocalNode::flushCommands()
1140 : {
1141 196 : _impl->incoming.interrupt();
1142 196 : }
1143 :
1144 : //----------------------------------------------------------------------
1145 : // receiver thread functions
1146 : //----------------------------------------------------------------------
1147 50 : void LocalNode::_runReceiverThread()
1148 : {
1149 50 : LB_TS_THREAD( _rcvThread );
1150 50 : _initService();
1151 :
1152 50 : int nErrors = 0;
1153 72650 : while( isListening( ))
1154 : {
1155 72551 : const ConnectionSet::Event result = _impl->incoming.select();
1156 72550 : switch( result )
1157 : {
1158 : case ConnectionSet::EVENT_CONNECT:
1159 34 : _handleConnect();
1160 34 : break;
1161 :
1162 : case ConnectionSet::EVENT_DATA:
1163 72179 : nErrors = 0;
1164 72179 : _handleData();
1165 72179 : break;
1166 :
1167 : case ConnectionSet::EVENT_DISCONNECT:
1168 : case ConnectionSet::EVENT_INVALID_HANDLE:
1169 34 : _handleDisconnect();
1170 34 : break;
1171 :
1172 : case ConnectionSet::EVENT_TIMEOUT:
1173 0 : LBINFO << "select timeout" << std::endl;
1174 0 : break;
1175 :
1176 : case ConnectionSet::EVENT_ERROR:
1177 0 : ++nErrors;
1178 0 : LBWARN << "Connection error during select" << std::endl;
1179 0 : if( nErrors > 100 )
1180 : {
1181 0 : LBWARN << "Too many errors in a row, capping connection"
1182 0 : << std::endl;
1183 0 : _handleDisconnect();
1184 : }
1185 0 : break;
1186 :
1187 : case ConnectionSet::EVENT_SELECT_ERROR:
1188 0 : LBWARN << "Error during select" << std::endl;
1189 0 : ++nErrors;
1190 0 : if( nErrors > 10 )
1191 : {
1192 0 : LBWARN << "Too many errors in a row" << std::endl;
1193 0 : LBUNIMPLEMENTED;
1194 : }
1195 0 : break;
1196 :
1197 : case ConnectionSet::EVENT_INTERRUPT:
1198 303 : _redispatchCommands();
1199 303 : break;
1200 :
1201 : default:
1202 0 : LBUNIMPLEMENTED;
1203 : }
1204 : }
1205 :
1206 49 : if( !_impl->pendingCommands.empty( ))
1207 0 : LBWARN << _impl->pendingCommands.size()
1208 0 : << " commands pending while leaving command thread" << std::endl;
1209 :
1210 49 : _impl->pendingCommands.clear();
1211 49 : LBCHECK( _impl->commandThread->join( ));
1212 :
1213 49 : ConnectionPtr connection = getConnection();
1214 98 : PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
1215 49 : connection = pipe->getSibling();
1216 49 : _removeConnection( connection );
1217 49 : _impl->connectionNodes.erase( connection );
1218 49 : _disconnect();
1219 :
1220 49 : const Connections& connections = _impl->incoming.getConnections();
1221 171 : while( !connections.empty( ))
1222 : {
1223 73 : connection = connections.back();
1224 73 : NodePtr node = _impl->connectionNodes[ connection ];
1225 :
1226 73 : if( node )
1227 73 : _closeNode( node );
1228 73 : _removeConnection( connection );
1229 73 : }
1230 :
1231 49 : _impl->objectStore->clear();
1232 49 : _impl->pendingCommands.clear();
1233 49 : _impl->smallBuffers.flush();
1234 49 : _impl->bigBuffers.flush();
1235 :
1236 245 : LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
1237 245 : << std::endl;
1238 49 : }
1239 :
1240 34 : void LocalNode::_handleConnect()
1241 : {
1242 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1243 68 : ConnectionPtr newConn = connection->acceptSync();
1244 34 : connection->acceptNB();
1245 :
1246 34 : if( newConn )
1247 34 : _addConnection( newConn );
1248 : else
1249 34 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1250 34 : }
1251 :
1252 34 : void LocalNode::_handleDisconnect()
1253 : {
1254 34 : while( _handleData( )) ; // read remaining data off connection
1255 :
1256 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1257 34 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
1258 :
1259 34 : if( i != _impl->connectionNodes.end( ))
1260 : {
1261 34 : NodePtr node = i->second;
1262 :
1263 34 : node->ref(); // extend lifetime to give cmd handler a chance
1264 :
1265 : // local command dispatching
1266 : OCommand( this, this, CMD_NODE_REMOVE_NODE )
1267 34 : << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
1268 :
1269 34 : if( node->getConnection() == connection )
1270 34 : _closeNode( node );
1271 0 : else if( connection->isMulticast( ))
1272 0 : node->_removeMulticast( connection );
1273 : }
1274 :
1275 34 : _removeConnection( connection );
1276 34 : }
1277 :
1278 72213 : bool LocalNode::_handleData()
1279 : {
1280 72213 : _impl->smallBuffers.compact();
1281 72213 : _impl->bigBuffers.compact();
1282 :
1283 72213 : ConnectionPtr connection = _impl->incoming.getConnection();
1284 72213 : LBASSERT( connection );
1285 :
1286 144426 : BufferPtr buffer = _readHead( connection );
1287 72213 : if( !buffer ) // fluke signal
1288 68 : return false;
1289 :
1290 144290 : ICommand command = _setupCommand( connection, buffer );
1291 72145 : const bool gotCommand = _readTail( command, buffer, connection );
1292 72145 : LBASSERT( gotCommand );
1293 :
1294 : // start next receive
1295 144290 : BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1296 72145 : connection->recvNB( nextBuffer, COMMAND_MINSIZE );
1297 :
1298 72145 : if( gotCommand )
1299 : {
1300 72145 : _dispatchCommand( command );
1301 72145 : return true;
1302 : }
1303 :
1304 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1305 72213 : return false;
1306 : }
1307 :
1308 72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
1309 : {
1310 72213 : BufferPtr buffer;
1311 72213 : const bool gotSize = connection->recvSync( buffer, false );
1312 :
1313 72213 : if( !buffer ) // fluke signal
1314 : {
1315 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1316 0 : << std::endl;
1317 0 : _impl->incoming.setDirty();
1318 0 : return 0;
1319 : }
1320 :
1321 72213 : if( gotSize )
1322 72145 : return buffer;
1323 :
1324 : // Some systems signal data on dead connections.
1325 68 : buffer->setSize( 0 );
1326 68 : connection->recvNB( buffer, COMMAND_MINSIZE );
1327 68 : return 0;
1328 : }
1329 :
1330 72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
1331 : ConstBufferPtr buffer )
1332 : {
1333 72145 : NodePtr node;
1334 72145 : ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
1335 72145 : if( i != _impl->connectionNodes.end( ))
1336 72077 : node = i->second;
1337 72145 : LBVERB << "Handle data from " << node << std::endl;
1338 :
1339 : #ifdef COLLAGE_BIGENDIAN
1340 : const bool swapping = node ? !node->isBigEndian() : false;
1341 : #else
1342 72145 : const bool swapping = node ? node->isBigEndian() : false;
1343 : #endif
1344 144290 : ICommand command( this, node, buffer, swapping );
1345 :
1346 72145 : if( node )
1347 : {
1348 72077 : node->_setLastReceive( getTime64( ));
1349 72077 : return command;
1350 : }
1351 :
1352 68 : uint32_t cmd = command.getCommand();
1353 : #ifdef COLLAGE_BIGENDIAN
1354 : lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
1355 : #endif
1356 68 : switch( cmd )
1357 : {
1358 : case CMD_NODE_CONNECT:
1359 : case CMD_NODE_CONNECT_REPLY:
1360 : case CMD_NODE_ID:
1361 : #ifdef COLLAGE_BIGENDIAN
1362 : command = ICommand( this, node, buffer, true );
1363 : #endif
1364 68 : break;
1365 :
1366 : case CMD_NODE_CONNECT_BE:
1367 : case CMD_NODE_CONNECT_REPLY_BE:
1368 : case CMD_NODE_ID_BE:
1369 : #ifndef COLLAGE_BIGENDIAN
1370 0 : command = ICommand( this, node, buffer, true );
1371 : #endif
1372 0 : break;
1373 :
1374 : default:
1375 0 : LBUNIMPLEMENTED;
1376 0 : return ICommand();
1377 : }
1378 :
1379 68 : command.setCommand( cmd ); // reset correctly swapped version
1380 72213 : return command;
1381 : }
1382 :
1383 72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
1384 : ConnectionPtr connection )
1385 : {
1386 72145 : const uint64_t needed = command.getSize();
1387 72145 : if( needed <= buffer->getSize( ))
1388 62134 : return true;
1389 :
1390 10011 : if( needed > buffer->getMaxSize( ))
1391 : {
1392 0 : LBASSERT( needed > COMMAND_ALLOCSIZE );
1393 0 : LBASSERTINFO( needed < LB_BIT48,
1394 : "Out-of-sync network stream: " << command << "?" );
1395 : // not enough space for remaining data, alloc and copy to new buffer
1396 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
1397 0 : newBuffer->replace( *buffer );
1398 0 : buffer = newBuffer;
1399 :
1400 0 : command = ICommand( this, command.getRemoteNode(), buffer,
1401 0 : command.isSwapping( ));
1402 : }
1403 :
1404 : // read remaining data
1405 10011 : connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
1406 10011 : return connection->recvSync( buffer );
1407 : }
1408 :
1409 35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
1410 : {
1411 35 : LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
1412 : BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
1413 : _impl->bigBuffers.alloc( size ) :
1414 35 : _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1415 35 : return buffer;
1416 : }
1417 :
1418 72194 : void LocalNode::_dispatchCommand( ICommand& command )
1419 : {
1420 72194 : LBASSERTINFO( command.isValid(), command );
1421 :
1422 72194 : if( dispatchCommand( command ))
1423 72194 : _redispatchCommands();
1424 : else
1425 : {
1426 0 : _redispatchCommands();
1427 0 : _impl->pendingCommands.push_back( command );
1428 : }
1429 72194 : }
1430 :
1431 72231 : bool LocalNode::dispatchCommand( ICommand& command )
1432 : {
1433 72231 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1434 72231 : LBASSERTINFO( command.isValid(), command );
1435 :
1436 72231 : const uint32_t type = command.getType();
1437 72231 : switch( type )
1438 : {
1439 : case COMMANDTYPE_NODE:
1440 71726 : LBCHECK( Dispatcher::dispatchCommand( command ));
1441 71726 : return true;
1442 :
1443 : case COMMANDTYPE_OBJECT:
1444 505 : return _impl->objectStore->dispatchObjectCommand( command );
1445 :
1446 : default:
1447 0 : LBABORT( "Unknown command type " << type << " for " << command );
1448 0 : return true;
1449 : }
1450 : }
1451 :
1452 72497 : void LocalNode::_redispatchCommands()
1453 : {
1454 72497 : bool changes = true;
1455 144994 : while( changes && !_impl->pendingCommands.empty( ))
1456 : {
1457 0 : changes = false;
1458 :
1459 0 : for( CommandList::iterator i = _impl->pendingCommands.begin();
1460 0 : i != _impl->pendingCommands.end(); ++i )
1461 : {
1462 0 : ICommand& command = *i;
1463 0 : LBASSERT( command.isValid( ));
1464 :
1465 0 : if( dispatchCommand( command ))
1466 : {
1467 0 : _impl->pendingCommands.erase( i );
1468 0 : changes = true;
1469 0 : break;
1470 : }
1471 : }
1472 : }
1473 :
1474 : #ifndef NDEBUG
1475 72497 : if( !_impl->pendingCommands.empty( ))
1476 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1477 0 : << std::endl;
1478 72497 : LBASSERT( _impl->pendingCommands.size() < 200 );
1479 : #endif
1480 72497 : }
1481 :
1482 50 : void LocalNode::_initService()
1483 : {
1484 50 : LB_TS_SCOPED( _rcvThread );
1485 50 : _impl->service->withdraw(); // go silent during k/v update
1486 :
1487 97 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1488 50 : if( descs.empty( ))
1489 53 : return;
1490 :
1491 94 : std::ostringstream out;
1492 47 : out << getType();
1493 47 : _impl->service->set( "co_type", out.str( ));
1494 :
1495 47 : out.str("");
1496 47 : out << descs.size();
1497 47 : _impl->service->set( "co_numPorts", out.str( ));
1498 :
1499 94 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
1500 : {
1501 47 : ConnectionDescriptionPtr desc = *i;
1502 47 : out.str("");
1503 47 : out << "co_port" << i - descs.begin();
1504 47 : _impl->service->set( out.str(), desc->toString( ));
1505 47 : }
1506 :
1507 94 : _impl->service->announce( descs.front()->port, getNodeID().getString( ));
1508 : }
1509 :
1510 49 : void LocalNode::_exitService()
1511 : {
1512 49 : _impl->service->withdraw();
1513 49 : }
1514 :
1515 1 : Zeroconf LocalNode::getZeroconf()
1516 : {
1517 1 : lunchbox::ScopedWrite mutex( _impl->service );
1518 1 : _impl->service->discover( servus::Servus::IF_ALL, 500 );
1519 1 : return Zeroconf( _impl->service.data );
1520 : }
1521 :
1522 :
1523 : //----------------------------------------------------------------------
1524 : // command thread functions
1525 : //----------------------------------------------------------------------
1526 50 : bool LocalNode::_startCommandThread( const int32_t threadID )
1527 : {
1528 50 : _impl->commandThread->threadID = threadID;
1529 50 : return _impl->commandThread->start();
1530 : }
1531 :
1532 51606 : bool LocalNode::_notifyCommandThreadIdle()
1533 : {
1534 51606 : return _impl->objectStore->notifyCommandThreadIdle();
1535 : }
1536 :
1537 20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
1538 : {
1539 20001 : const uint32_t requestID = command.get< uint32_t >();
1540 20001 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
1541 :
1542 20001 : serveRequest( requestID );
1543 20001 : return true;
1544 : }
1545 :
1546 49 : bool LocalNode::_cmdStopRcv( ICommand& command )
1547 : {
1548 49 : LB_TS_THREAD( _rcvThread );
1549 49 : LBASSERT( isListening( ));
1550 :
1551 49 : _exitService();
1552 49 : _setClosing(); // causes rcv thread exit
1553 :
1554 49 : command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
1555 49 : _dispatchCommand( command );
1556 49 : return true;
1557 : }
1558 :
1559 49 : bool LocalNode::_cmdStopCmd( ICommand& )
1560 : {
1561 49 : LB_TS_THREAD( _cmdThread );
1562 49 : LBASSERTINFO( isClosing(), *this );
1563 :
1564 49 : _setClosed();
1565 49 : return true;
1566 : }
1567 :
1568 0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
1569 : {
1570 0 : const int32_t affinity = command.get< int32_t >();
1571 :
1572 0 : lunchbox::Thread::setAffinity( affinity );
1573 0 : return true;
1574 : }
1575 :
1576 34 : bool LocalNode::_cmdConnect( ICommand& command )
1577 : {
1578 34 : LBASSERTINFO( !command.getRemoteNode(), command );
1579 34 : LBASSERT( _impl->inReceiverThread( ));
1580 :
1581 34 : const NodeID& nodeID = command.get< NodeID >();
1582 34 : const uint32_t requestID = command.get< uint32_t >();
1583 34 : const uint32_t nodeType = command.get< uint32_t >();
1584 34 : std::string data = command.get< std::string >();
1585 :
1586 34 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1587 34 : << nodeType << " data " << data << std::endl;
1588 :
1589 68 : ConnectionPtr connection = _impl->incoming.getConnection();
1590 :
1591 34 : LBASSERT( connection );
1592 34 : LBASSERT( nodeID != getNodeID() );
1593 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1594 : _impl->connectionNodes.end( ));
1595 :
1596 68 : NodePtr peer;
1597 : #ifdef COLLAGE_BIGENDIAN
1598 : uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
1599 : lunchbox::byteswap( cmd );
1600 : #else
1601 34 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1602 : #endif
1603 :
1604 : // No locking needed, only recv thread modifies
1605 34 : NodeHashCIter i = _impl->nodes->find( nodeID );
1606 34 : if( i != _impl->nodes->end( ))
1607 : {
1608 0 : peer = i->second;
1609 0 : if( peer->isReachable( ))
1610 : {
1611 : // Node exists, probably simultaneous connect from peer
1612 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1613 0 : << std::endl;
1614 :
1615 : // refuse connection
1616 : OCommand( Connections( 1, connection ), cmd )
1617 0 : << NodeID() << requestID;
1618 :
1619 : // NOTE: There is no close() here. The reply command above has to be
1620 : // received by the peer first, before closing the connection.
1621 0 : _removeConnection( connection );
1622 0 : return true;
1623 : }
1624 : }
1625 :
1626 : // create and add connected node
1627 34 : if( !peer )
1628 34 : peer = createNode( nodeType );
1629 34 : if( !peer )
1630 : {
1631 0 : LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
1632 0 : << std::endl;
1633 :
1634 : // refuse connection
1635 0 : OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
1636 :
1637 : // NOTE: There is no close() here. The reply command above has to be
1638 : // received by the peer first, before closing the connection.
1639 0 : _removeConnection( connection );
1640 0 : return true;
1641 : }
1642 :
1643 34 : if( !peer->deserialize( data ))
1644 0 : LBWARN << "Error during node initialization" << std::endl;
1645 34 : LBASSERTINFO( data.empty(), data );
1646 34 : LBASSERTINFO( peer->getNodeID() == nodeID,
1647 : peer->getNodeID() << "!=" << nodeID );
1648 34 : LBASSERT( peer->getType() == nodeType );
1649 :
1650 34 : _impl->connectionNodes[ connection ] = peer;
1651 : {
1652 34 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1653 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1654 : }
1655 34 : LBVERB << "Added node " << nodeID << std::endl;
1656 :
1657 : // send our information as reply
1658 : OCommand( Connections( 1, connection ), cmd )
1659 34 : << getNodeID() << requestID << getType() << serialize();
1660 :
1661 68 : return true;
1662 : }
1663 :
1664 34 : bool LocalNode::_cmdConnectReply( ICommand& command )
1665 : {
1666 34 : LBASSERT( !command.getRemoteNode( ));
1667 34 : LBASSERT( _impl->inReceiverThread( ));
1668 :
1669 34 : ConnectionPtr connection = _impl->incoming.getConnection();
1670 34 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1671 : _impl->connectionNodes.end( ));
1672 :
1673 34 : const NodeID& nodeID = command.get< NodeID >();
1674 34 : const uint32_t requestID = command.get< uint32_t >();
1675 :
1676 : // connection refused
1677 34 : if( nodeID == 0 )
1678 : {
1679 0 : LBINFO << "Connection refused, node already connected by peer"
1680 0 : << std::endl;
1681 :
1682 0 : _removeConnection( connection );
1683 0 : serveRequest( requestID, false );
1684 0 : return true;
1685 : }
1686 :
1687 34 : const uint32_t nodeType = command.get< uint32_t >();
1688 68 : std::string data = command.get< std::string >();
1689 :
1690 34 : LBVERB << "handle connect reply " << command << " req " << requestID
1691 34 : << " type " << nodeType << " data " << data << std::endl;
1692 :
1693 : // No locking needed, only recv thread modifies
1694 34 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1695 68 : NodePtr peer;
1696 34 : if( i != _impl->nodes->end( ))
1697 0 : peer = i->second;
1698 :
1699 34 : if( peer && peer->isReachable( )) // simultaneous connect
1700 : {
1701 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1702 0 : << connection << std::endl;
1703 :
1704 0 : _removeConnection( connection );
1705 0 : _closeNode( peer );
1706 0 : serveRequest( requestID, false );
1707 0 : return true;
1708 : }
1709 :
1710 : // create and add node
1711 34 : if( !peer )
1712 : {
1713 34 : if( requestID != LB_UNDEFINED_UINT32 )
1714 34 : peer = reinterpret_cast< Node* >( getRequestData( requestID ));
1715 : else
1716 0 : peer = createNode( nodeType );
1717 : }
1718 34 : if( !peer )
1719 : {
1720 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1721 0 : << std::endl;
1722 0 : _removeConnection( connection );
1723 0 : return true;
1724 : }
1725 :
1726 34 : LBASSERTINFO( peer->getType() == nodeType,
1727 : peer->getType() << " != " << nodeType );
1728 34 : LBASSERT( peer->isClosed( ));
1729 :
1730 34 : if( !peer->deserialize( data ))
1731 0 : LBWARN << "Error during node initialization" << std::endl;
1732 34 : LBASSERT( data.empty( ));
1733 34 : LBASSERT( peer->getNodeID() == nodeID );
1734 :
1735 : // send ACK to peer
1736 : // cppcheck-suppress unusedScopedObject
1737 34 : OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
1738 :
1739 34 : peer->_connect( connection );
1740 34 : _impl->connectionNodes[ connection ] = peer;
1741 : {
1742 34 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1743 34 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1744 : }
1745 34 : _connectMulticast( peer );
1746 34 : LBVERB << "Added node " << nodeID << std::endl;
1747 :
1748 34 : serveRequest( requestID, true );
1749 :
1750 34 : notifyConnect( peer );
1751 68 : return true;
1752 : }
1753 :
1754 34 : bool LocalNode::_cmdConnectAck( ICommand& command )
1755 : {
1756 34 : NodePtr node = command.getRemoteNode();
1757 34 : LBASSERT( node );
1758 34 : LBASSERT( _impl->inReceiverThread( ));
1759 34 : LBVERB << "handle connect ack" << std::endl;
1760 :
1761 34 : node->_connect( _impl->incoming.getConnection( ));
1762 34 : _connectMulticast( node );
1763 34 : notifyConnect( node );
1764 34 : return true;
1765 : }
1766 :
1767 0 : bool LocalNode::_cmdID( ICommand& command )
1768 : {
1769 0 : LBASSERT( _impl->inReceiverThread( ));
1770 :
1771 0 : const NodeID& nodeID = command.get< NodeID >();
1772 0 : uint32_t nodeType = command.get< uint32_t >();
1773 0 : std::string data = command.get< std::string >();
1774 :
1775 0 : if( command.getRemoteNode( ))
1776 : {
1777 0 : LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
1778 0 : LBASSERT( command.getRemoteNode()->_getMulticast( ));
1779 0 : return true;
1780 : }
1781 :
1782 0 : LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
1783 :
1784 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1785 0 : LBASSERT( connection->isMulticast( ));
1786 0 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1787 : _impl->connectionNodes.end( ));
1788 :
1789 0 : NodePtr node;
1790 0 : if( nodeID == getNodeID() ) // 'self' multicast connection
1791 0 : node = this;
1792 : else
1793 : {
1794 : // No locking needed, only recv thread writes
1795 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1796 :
1797 0 : if( i == _impl->nodes->end( ))
1798 : {
1799 : // unknown node: create and add unconnected node
1800 0 : node = createNode( nodeType );
1801 :
1802 0 : if( !node->deserialize( data ))
1803 0 : LBWARN << "Error during node initialization" << std::endl;
1804 0 : LBASSERTINFO( data.empty(), data );
1805 :
1806 : {
1807 0 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1808 0 : _impl->nodes.data[ nodeID ] = node;
1809 : }
1810 0 : LBVERB << "Added node " << nodeID << " with multicast "
1811 0 : << connection << std::endl;
1812 : }
1813 : else
1814 0 : node = i->second;
1815 : }
1816 0 : LBASSERT( node );
1817 0 : LBASSERTINFO( node->getNodeID() == nodeID,
1818 : node->getNodeID() << "!=" << nodeID );
1819 :
1820 0 : _connectMulticast( node, connection );
1821 0 : _impl->connectionNodes[ connection ] = node;
1822 0 : LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
1823 0 : << " to " << getNodeID() << std::endl;
1824 0 : return true;
1825 : }
1826 :
1827 8 : bool LocalNode::_cmdDisconnect( ICommand& command )
1828 : {
1829 8 : LBASSERT( _impl->inReceiverThread( ));
1830 :
1831 8 : const uint32_t requestID = command.get< uint32_t >();
1832 :
1833 8 : NodePtr node = static_cast<Node*>( getRequestData( requestID ));
1834 8 : LBASSERT( node );
1835 :
1836 8 : _closeNode( node );
1837 8 : LBASSERT( node->isClosed( ));
1838 8 : serveRequest( requestID );
1839 8 : return true;
1840 : }
1841 :
1842 0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
1843 : {
1844 0 : const NodeID& nodeID = command.get< NodeID >();
1845 0 : const uint32_t requestID = command.get< uint32_t >();
1846 :
1847 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1848 0 : << " nodeID " << nodeID << std::endl;
1849 :
1850 0 : NodePtr node = getNode( nodeID );
1851 0 : NodePtr toNode = command.getRemoteNode();
1852 :
1853 0 : uint32_t nodeType = NODETYPE_INVALID;
1854 0 : std::string nodeData;
1855 0 : if( node )
1856 : {
1857 0 : nodeType = node->getType();
1858 0 : nodeData = node->serialize();
1859 0 : LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
1860 0 : << toNode << std::endl;
1861 : }
1862 :
1863 : toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
1864 0 : << nodeID << requestID << nodeType << nodeData;
1865 0 : return true;
1866 : }
1867 :
1868 0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
1869 : {
1870 0 : LBASSERT( _impl->inReceiverThread( ));
1871 :
1872 0 : const NodeID& nodeID = command.get< NodeID >();
1873 0 : const uint32_t requestID = command.get< uint32_t >();
1874 0 : const uint32_t nodeType = command.get< uint32_t >();
1875 0 : std::string nodeData = command.get< std::string >();
1876 :
1877 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
1878 0 : << " type " << nodeType << " data " << nodeData << std::endl;
1879 :
1880 : // No locking needed, only recv thread writes
1881 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1882 0 : if( i != _impl->nodes->end( ))
1883 : {
1884 : // Requested node connected to us in the meantime
1885 0 : NodePtr node = i->second;
1886 :
1887 0 : node->ref( this );
1888 0 : serveRequest( requestID, node.get( ));
1889 0 : return true;
1890 : }
1891 :
1892 0 : if( nodeType == NODETYPE_INVALID )
1893 : {
1894 0 : serveRequest( requestID, (void*)0 );
1895 0 : return true;
1896 : }
1897 :
1898 : // new node: create and add unconnected node
1899 0 : NodePtr node = createNode( nodeType );
1900 0 : if( node )
1901 : {
1902 0 : LBASSERT( node );
1903 :
1904 0 : if( !node->deserialize( nodeData ))
1905 0 : LBWARN << "Failed to initialize node data" << std::endl;
1906 0 : LBASSERT( nodeData.empty( ));
1907 0 : node->ref( this );
1908 : }
1909 : else
1910 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
1911 :
1912 0 : serveRequest( requestID, node.get( ));
1913 0 : return true;
1914 : }
1915 :
1916 0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
1917 : {
1918 0 : LBASSERT( inCommandThread( ));
1919 0 : if( !_impl->sendToken ) // enqueue command if no token available
1920 : {
1921 0 : const uint32_t timeout = Global::getTimeout();
1922 0 : if( timeout == LB_TIMEOUT_INDEFINITE ||
1923 0 : ( getTime64() - _impl->lastSendToken <= timeout ))
1924 : {
1925 0 : _impl->sendTokenQueue.push_back( command );
1926 0 : return true;
1927 : }
1928 :
1929 : // timeout! - clear old requests
1930 0 : _impl->sendTokenQueue.clear();
1931 : // 'generate' new token - release is robust
1932 : }
1933 :
1934 0 : _impl->sendToken = false;
1935 :
1936 0 : const uint32_t requestID = command.get< uint32_t >();
1937 0 : command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1938 0 : << requestID;
1939 0 : return true;
1940 : }
1941 :
1942 0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
1943 : {
1944 0 : const uint32_t requestID = command.get< uint32_t >();
1945 0 : serveRequest( requestID );
1946 0 : return true;
1947 : }
1948 :
1949 0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
1950 : {
1951 0 : LBASSERT( inCommandThread( ));
1952 0 : _impl->lastSendToken = getTime64();
1953 :
1954 0 : if( _impl->sendToken )
1955 0 : return true; // double release due to timeout
1956 0 : if( _impl->sendTokenQueue.empty( ))
1957 : {
1958 0 : _impl->sendToken = true;
1959 0 : return true;
1960 : }
1961 :
1962 0 : ICommand& request = _impl->sendTokenQueue.front();
1963 :
1964 0 : const uint32_t requestID = request.get< uint32_t >();
1965 0 : request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1966 0 : << requestID;
1967 0 : _impl->sendTokenQueue.pop_front();
1968 0 : return true;
1969 : }
1970 :
1971 0 : bool LocalNode::_cmdAddListener( ICommand& command )
1972 : {
1973 : Connection* rawConnection =
1974 0 : reinterpret_cast< Connection* >( command.get< uint64_t >( ));
1975 0 : std::string data = command.get< std::string >();
1976 :
1977 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
1978 0 : command.getRemoteNode()->_addConnectionDescription( description );
1979 :
1980 0 : if( command.getRemoteNode() != this )
1981 0 : return true;
1982 :
1983 0 : ConnectionPtr connection = rawConnection;
1984 0 : connection->unref();
1985 0 : LBASSERT( connection );
1986 :
1987 0 : _impl->connectionNodes[ connection ] = this;
1988 0 : if( connection->isMulticast( ))
1989 0 : _addMulticast( this, connection );
1990 :
1991 0 : connection->acceptNB();
1992 0 : _impl->incoming.addConnection( connection );
1993 :
1994 0 : _initService(); // update zeroconf
1995 0 : return true;
1996 : }
1997 :
1998 0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
1999 : {
2000 0 : const uint32_t requestID = command.get< uint32_t >();
2001 0 : Connection* rawConnection = command.get< Connection* >();
2002 0 : std::string data = command.get< std::string >();
2003 :
2004 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2005 0 : LBCHECK(
2006 : command.getRemoteNode()->_removeConnectionDescription( description ));
2007 :
2008 0 : if( command.getRemoteNode() != this )
2009 0 : return true;
2010 :
2011 0 : _initService(); // update zeroconf
2012 :
2013 0 : ConnectionPtr connection = rawConnection;
2014 0 : connection->unref( this );
2015 0 : LBASSERT( connection );
2016 :
2017 0 : if( connection->isMulticast( ))
2018 0 : _removeMulticast( connection );
2019 :
2020 0 : _impl->incoming.removeConnection( connection );
2021 0 : LBASSERT( _impl->connectionNodes.find( connection ) !=
2022 : _impl->connectionNodes.end( ));
2023 0 : _impl->connectionNodes.erase( connection );
2024 0 : serveRequest( requestID );
2025 0 : return true;
2026 : }
2027 :
2028 0 : bool LocalNode::_cmdPing( ICommand& command )
2029 : {
2030 0 : LBASSERT( inCommandThread( ));
2031 0 : command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
2032 0 : return true;
2033 : }
2034 :
2035 2 : bool LocalNode::_cmdCommand( ICommand& command )
2036 : {
2037 2 : const uint128_t& commandID = command.get< uint128_t >();
2038 2 : CommandHandler func;
2039 : {
2040 2 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2041 2 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2042 2 : if( i == _impl->commandHandlers->end( ))
2043 0 : return false;
2044 :
2045 2 : CommandQueue* queue = i->second.second;
2046 2 : if( queue )
2047 : {
2048 : command.setDispatchFunction( CmdFunc( this,
2049 1 : &LocalNode::_cmdCommandAsync ));
2050 1 : queue->push( command );
2051 1 : return true;
2052 : }
2053 : // else
2054 :
2055 1 : func = i->second.first;
2056 : }
2057 :
2058 2 : CustomICommand customCmd( command );
2059 3 : return func( customCmd );
2060 : }
2061 :
2062 1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
2063 : {
2064 1 : const uint128_t& commandID = command.get< uint128_t >();
2065 1 : CommandHandler func;
2066 : {
2067 1 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2068 1 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2069 1 : LBASSERT( i != _impl->commandHandlers->end( ));
2070 1 : if( i == _impl->commandHandlers->end( ))
2071 0 : return true; // deregistered between dispatch and now
2072 1 : func = i->second.first;
2073 : }
2074 2 : CustomICommand customCmd( command );
2075 2 : return func( customCmd );
2076 : }
2077 :
2078 34 : bool LocalNode::_cmdAddConnection( ICommand& command )
2079 : {
2080 34 : LBASSERT( _impl->inReceiverThread( ));
2081 :
2082 34 : ConnectionPtr connection = command.get< ConnectionPtr >();
2083 34 : _addConnection( connection );
2084 34 : connection->unref(); // ref'd by _addConnection
2085 34 : return true;
2086 : }
2087 :
2088 66 : }
|