Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/futureFunction.h>
46 : #include <lunchbox/hash.h>
47 : #include <lunchbox/lockable.h>
48 : #include <lunchbox/request.h>
49 : #include <lunchbox/rng.h>
50 : #include <lunchbox/servus.h>
51 : #include <lunchbox/sleep.h>
52 :
53 : #include <boost/bind.hpp>
54 : #include <boost/date_time/posix_time/posix_time.hpp>
55 : #include <boost/lexical_cast.hpp>
56 :
57 : #include <list>
58 :
59 : namespace bp = boost::posix_time;
60 :
61 : namespace co
62 : {
63 : namespace
64 : {
65 21 : lunchbox::a_int32_t _threadIDs;
66 :
67 : typedef CommandFunc< LocalNode > CmdFunc;
68 : typedef std::list< ICommand > CommandList;
69 : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
70 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
71 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
72 : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
73 : typedef NodeHash::const_iterator NodeHashCIter;
74 : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
75 : typedef HandlerHash::const_iterator HandlerHashCIter;
76 : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
77 : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
78 : typedef CommandHash::const_iterator CommandHashCIter;
79 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
80 : }
81 :
82 : namespace detail
83 : {
84 98 : class ReceiverThread : public lunchbox::Thread
85 : {
86 : public:
87 51 : explicit ReceiverThread( co::LocalNode* localNode )
88 51 : : _localNode( localNode ) {}
89 :
90 46 : bool init() override
91 : {
92 46 : const int32_t threadID = ++_threadIDs - 1;
93 92 : setName( std::string( "Rcv" ) +
94 46 : boost::lexical_cast< std::string >( threadID ));
95 46 : return _localNode->_startCommandThread( threadID );
96 : }
97 :
98 46 : void run() override { _localNode->_runReceiverThread(); }
99 :
100 : private:
101 : co::LocalNode* const _localNode;
102 : };
103 :
104 98 : class CommandThread : public Worker
105 : {
106 : public:
107 51 : explicit CommandThread( co::LocalNode* localNode )
108 : : Worker( Global::getCommandQueueLimit( ))
109 : , threadID( 0 )
110 51 : , _localNode( localNode )
111 51 : {}
112 :
113 : int32_t threadID;
114 :
115 : protected:
116 46 : bool init() override
117 : {
118 92 : setName( std::string( "Cmd" ) +
119 46 : boost::lexical_cast< std::string >( threadID ));
120 46 : return true;
121 : }
122 :
123 102919 : bool stopRunning() override { return _localNode->isClosed(); }
124 51173 : bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
125 :
126 : private:
127 : co::LocalNode* const _localNode;
128 : };
129 :
130 : class LocalNode
131 : {
132 : public:
133 51 : LocalNode()
134 : : smallBuffers( 200 )
135 : , bigBuffers( 20 )
136 : , sendToken( true )
137 : , lastSendToken( 0 )
138 : , objectStore( 0 )
139 : , receiverThread( 0 )
140 : , commandThread( 0 )
141 51 : , service( "_collage._tcp" )
142 51 : {}
143 :
144 49 : ~LocalNode()
145 49 : {
146 49 : LBASSERT( incoming.isEmpty( ));
147 49 : LBASSERT( connectionNodes.empty( ));
148 49 : LBASSERT( pendingCommands.empty( ));
149 49 : LBASSERT( nodes->empty( ));
150 :
151 49 : delete objectStore;
152 49 : objectStore = 0;
153 49 : LBASSERT( !commandThread->isRunning( ));
154 49 : delete commandThread;
155 49 : commandThread = 0;
156 :
157 49 : LBASSERT( !receiverThread->isRunning( ));
158 49 : delete receiverThread;
159 49 : receiverThread = 0;
160 49 : }
161 :
162 271 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
163 :
164 : /** Commands re-scheduled for dispatch. */
165 : CommandList pendingCommands;
166 :
167 : /** The command buffer 'allocator' for small packets */
168 : co::BufferCache smallBuffers;
169 :
170 : /** The command buffer 'allocator' for big packets */
171 : co::BufferCache bigBuffers;
172 :
173 : bool sendToken; //!< send token availability.
174 : uint64_t lastSendToken; //!< last used time for timeout detection
175 : std::deque< co::ICommand > sendTokenQueue; //!< pending requests
176 :
177 : /** Manager of distributed object */
178 : ObjectStore* objectStore;
179 :
180 : /** Needed for thread-safety during nodeID-based connect() */
181 : lunchbox::Lock connectLock;
182 :
183 : /** The node for each connection. */
184 : ConnectionNodeHash connectionNodes; // read and write: recv only
185 :
186 : /** The connected nodes. */
187 : lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
188 :
189 : /** The connection set of all connections from/to this node. */
190 : co::ConnectionSet incoming;
191 :
192 : /** The process-global clock. */
193 : lunchbox::Clock clock;
194 :
195 : /** The registered push handlers. */
196 : lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
197 :
198 : /** The registered custom command handlers. */
199 : lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
200 :
201 : ReceiverThread* receiverThread;
202 : CommandThread* commandThread;
203 :
204 : lunchbox::Lockable< lunchbox::Servus > service;
205 :
206 : // Performance counters:
207 : a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
208 : };
209 : }
210 :
211 51 : LocalNode::LocalNode( const uint32_t type )
212 : : Node( type )
213 51 : , _impl( new detail::LocalNode )
214 : {
215 51 : _impl->receiverThread = new detail::ReceiverThread( this );
216 51 : _impl->commandThread = new detail::CommandThread( this );
217 51 : _impl->objectStore = new ObjectStore( this, _impl->counters );
218 :
219 51 : CommandQueue* queue = getCommandThreadQueue();
220 : registerCommand( CMD_NODE_CONNECT,
221 51 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
222 : registerCommand( CMD_NODE_CONNECT_BE,
223 51 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
224 : registerCommand( CMD_NODE_CONNECT_REPLY,
225 51 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
226 : registerCommand( CMD_NODE_CONNECT_REPLY_BE,
227 51 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
228 : registerCommand( CMD_NODE_ID,
229 51 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
230 : registerCommand( CMD_NODE_ID_BE,
231 51 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
232 : registerCommand( CMD_NODE_ACK_REQUEST,
233 51 : CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
234 : registerCommand( CMD_NODE_STOP_RCV,
235 51 : CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
236 : registerCommand( CMD_NODE_STOP_CMD,
237 51 : CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
238 : registerCommand( CMD_NODE_SET_AFFINITY_RCV,
239 51 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
240 : registerCommand( CMD_NODE_SET_AFFINITY_CMD,
241 51 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
242 : registerCommand( CMD_NODE_CONNECT_ACK,
243 51 : CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
244 : registerCommand( CMD_NODE_DISCONNECT,
245 51 : CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
246 : registerCommand( CMD_NODE_GET_NODE_DATA,
247 51 : CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
248 : registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
249 51 : CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
250 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
251 51 : CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
252 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
253 51 : CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
254 : registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
255 51 : CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
256 : registerCommand( CMD_NODE_ADD_LISTENER,
257 51 : CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
258 : registerCommand( CMD_NODE_REMOVE_LISTENER,
259 51 : CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
260 : registerCommand( CMD_NODE_PING,
261 51 : CmdFunc( this, &LocalNode::_cmdPing ), queue );
262 : registerCommand( CMD_NODE_PING_REPLY,
263 51 : CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
264 : registerCommand( CMD_NODE_COMMAND,
265 51 : CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
266 : registerCommand( CMD_NODE_ADD_CONNECTION,
267 51 : CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
268 51 : }
269 :
270 137 : LocalNode::~LocalNode( )
271 : {
272 49 : LBASSERT( !hasPendingRequests( ));
273 49 : delete _impl;
274 88 : }
275 :
276 1 : bool LocalNode::initLocal( const int argc, char** argv )
277 : {
278 : // We do not use getopt_long because it really does not work due to the
279 : // following aspects:
280 : // - reordering of arguments
281 : // - different behavior of GNU and BSD implementations
282 : // - incomplete man pages
283 1 : for( int i=1; i<argc; ++i )
284 : {
285 0 : if( std::string( "--eq-listen" ) == argv[i] )
286 0 : LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
287 0 : if( std::string( "--eq-listen" ) == argv[i] ||
288 0 : std::string( "--co-listen" ) == argv[i] )
289 : {
290 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
291 : {
292 0 : std::string data = argv[++i];
293 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
294 0 : desc->port = Global::getDefaultPort();
295 :
296 0 : if( desc->fromString( data ))
297 : {
298 0 : addConnectionDescription( desc );
299 0 : LBASSERTINFO( data.empty(), data );
300 : }
301 : else
302 0 : LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
303 : }
304 : else
305 : {
306 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
307 : }
308 : }
309 0 : else if ( std::string( "--co-globals" ) == argv[i] )
310 : {
311 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
312 : {
313 0 : const std::string data = argv[++i];
314 0 : if( !Global::fromString( data ))
315 : {
316 0 : LBWARN << "Invalid global variables string: " << data
317 0 : << ", using default global variables." << std::endl;
318 0 : }
319 : }
320 : else
321 : {
322 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
323 : }
324 : }
325 : }
326 :
327 1 : if( !listen( ))
328 : {
329 0 : LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
330 0 : << std::endl;
331 0 : return false;
332 : }
333 1 : return true;
334 : }
335 :
336 46 : bool LocalNode::listen()
337 : {
338 46 : LBVERB << "Listener data: " << serialize() << std::endl;
339 46 : if( !isClosed() || !_connectSelf( ))
340 0 : return false;
341 :
342 46 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
343 270 : for( ConnectionDescriptionsCIter i = descriptions.begin();
344 180 : i != descriptions.end(); ++i )
345 : {
346 44 : ConnectionDescriptionPtr description = *i;
347 88 : ConnectionPtr connection = Connection::create( description );
348 :
349 44 : if( !connection || !connection->listen( ))
350 : {
351 0 : LBWARN << "Can't create listener connection: " << description
352 0 : << std::endl;
353 0 : return false;
354 : }
355 :
356 44 : _impl->connectionNodes[ connection ] = this;
357 44 : if( connection->isMulticast( ))
358 0 : _addMulticast( this, connection );
359 :
360 44 : connection->acceptNB();
361 44 : _impl->incoming.addConnection( connection );
362 :
363 88 : LBVERB << "Added node " << getNodeID() << " using " << connection
364 132 : << std::endl;
365 44 : }
366 :
367 92 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
368 138 : << std::endl;
369 :
370 46 : _setListening();
371 46 : _impl->receiverThread->start();
372 :
373 46 : LBDEBUG << *this << std::endl;
374 46 : return true;
375 : }
376 :
377 45 : bool LocalNode::close()
378 : {
379 45 : if( !isListening() )
380 0 : return false;
381 :
382 45 : send( CMD_NODE_STOP_RCV );
383 :
384 45 : LBCHECK( _impl->receiverThread->join( ));
385 45 : _cleanup();
386 :
387 135 : LBDEBUG << _impl->incoming.getSize() << " connections open after close"
388 135 : << std::endl;
389 : #ifndef NDEBUG
390 45 : const Connections& connections = _impl->incoming.getConnections();
391 135 : for( Connections::const_iterator i = connections.begin();
392 90 : i != connections.end(); ++i )
393 : {
394 0 : LBDEBUG << " " << *i << std::endl;
395 : }
396 : #endif
397 :
398 45 : LBASSERTINFO( !hasPendingRequests(),
399 : *static_cast< lunchbox::RequestHandler* >( this ));
400 45 : return true;
401 : }
402 :
403 0 : void LocalNode::setAffinity( const int32_t affinity )
404 : {
405 0 : send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
406 0 : send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
407 :
408 0 : lunchbox::Thread::setAffinity( affinity );
409 0 : }
410 :
411 33 : void LocalNode::addConnection( ConnectionPtr connection )
412 : {
413 33 : connection->ref(); // unref in _cmdAddConnection
414 33 : send( CMD_NODE_ADD_CONNECTION ) << connection;
415 33 : }
416 :
417 0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
418 : {
419 0 : LBASSERT( isListening( ));
420 :
421 0 : ConnectionPtr connection = Connection::create( desc );
422 0 : if( connection && connection->listen( ))
423 : {
424 0 : addListener( connection );
425 0 : return connection;
426 : }
427 :
428 0 : return 0;
429 : }
430 :
431 0 : void LocalNode::addListener( ConnectionPtr connection )
432 : {
433 0 : LBASSERT( isListening( ));
434 0 : LBASSERT( connection->isListening( ));
435 0 : if( !isListening() || !connection->isListening( ))
436 0 : return;
437 :
438 0 : connection->ref( this ); // unref in self handler
439 :
440 : // Update everybody's description list of me, add the listener to myself in
441 : // my handler
442 0 : Nodes nodes;
443 0 : getNodes( nodes );
444 :
445 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
446 0 : (*i)->send( CMD_NODE_ADD_LISTENER )
447 0 : << (uint64_t)(connection.get( ))
448 0 : << connection->getDescription()->toString();
449 : }
450 :
451 0 : void LocalNode::removeListeners( const Connections& connections )
452 : {
453 0 : std::vector< lunchbox::Request< void > > requests;
454 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
455 : {
456 0 : ConnectionPtr connection = *i;
457 0 : requests.push_back( _removeListener( connection ));
458 0 : }
459 0 : }
460 :
461 0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
462 : {
463 0 : LBASSERT( isListening( ));
464 0 : LBASSERTINFO( !conn->isConnected(), conn );
465 :
466 0 : conn->ref( this );
467 0 : const lunchbox::Request< void > request = registerRequest< void >();
468 0 : Nodes nodes;
469 0 : getNodes( nodes );
470 :
471 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
472 0 : (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
473 0 : << conn->getDescription()->toString();
474 0 : return request;
475 : }
476 :
477 145 : void LocalNode::_addConnection( ConnectionPtr connection )
478 : {
479 145 : if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
480 : {
481 33 : addConnection( connection );
482 178 : return;
483 : }
484 :
485 112 : BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
486 112 : connection->recvNB( buffer, COMMAND_MINSIZE );
487 112 : _impl->incoming.addConnection( connection );
488 : }
489 :
490 214 : void LocalNode::_removeConnection( ConnectionPtr connection )
491 : {
492 214 : LBASSERT( connection );
493 :
494 214 : _impl->incoming.removeConnection( connection );
495 214 : connection->resetRecvData();
496 214 : if( !connection->isClosed( ))
497 122 : connection->close(); // cancel pending IO's
498 214 : }
499 :
500 45 : void LocalNode::_cleanup()
501 : {
502 45 : LBVERB << "Clean up stopped node" << std::endl;
503 45 : LBASSERTINFO( isClosed(), *this );
504 :
505 45 : if( !_impl->connectionNodes.empty( ))
506 132 : LBDEBUG << _impl->connectionNodes.size()
507 132 : << " open connections during cleanup" << std::endl;
508 : #ifndef NDEBUG
509 267 : for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
510 178 : i != _impl->connectionNodes.end(); ++i )
511 : {
512 44 : NodePtr node = i->second;
513 44 : LBDEBUG << " " << i->first << " : " << node << std::endl;
514 44 : }
515 : #endif
516 :
517 45 : _impl->connectionNodes.clear();
518 :
519 45 : if( !_impl->nodes->empty( ))
520 3 : LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
521 3 : << std::endl;
522 :
523 : #ifndef NDEBUG
524 46 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
525 : {
526 1 : NodePtr node = i->second;
527 1 : LBDEBUG << " " << node << std::endl;
528 1 : }
529 : #endif
530 :
531 45 : _impl->nodes->clear();
532 45 : }
533 :
534 110 : void LocalNode::_closeNode( NodePtr node )
535 : {
536 110 : ConnectionPtr connection = node->getConnection();
537 220 : ConnectionPtr mcConnection = node->_getMulticast();
538 :
539 110 : node->_disconnect();
540 :
541 110 : if( connection )
542 : {
543 66 : LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
544 : _impl->connectionNodes.end(), connection );
545 :
546 66 : _removeConnection( connection );
547 66 : _impl->connectionNodes.erase( connection );
548 : }
549 :
550 110 : if( mcConnection )
551 : {
552 0 : _removeConnection( mcConnection );
553 0 : _impl->connectionNodes.erase( mcConnection );
554 : }
555 :
556 110 : _impl->objectStore->removeInstanceData( node->getNodeID( ));
557 :
558 220 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
559 110 : _impl->nodes->erase( node->getNodeID( ));
560 110 : notifyDisconnect( node );
561 220 : LBDEBUG << node << " disconnected from " << *this << std::endl;
562 110 : }
563 :
564 46 : bool LocalNode::_connectSelf()
565 : {
566 : // setup local connection to myself
567 46 : PipeConnectionPtr connection = new PipeConnection;
568 46 : if( !connection->connect( ))
569 : {
570 0 : LBERROR << "Could not create local connection to receiver thread."
571 0 : << std::endl;
572 0 : return false;
573 : }
574 :
575 46 : Node::_connect( connection->getSibling( ));
576 46 : _setClosed(); // reset state after _connect set it to connected
577 :
578 : // add to connection set
579 46 : LBASSERT( connection->getDescription( ));
580 46 : LBASSERT( _impl->connectionNodes.find( connection ) ==
581 : _impl->connectionNodes.end( ));
582 :
583 46 : _impl->connectionNodes[ connection ] = this;
584 46 : _impl->nodes.data[ getNodeID() ] = this;
585 46 : _addConnection( connection );
586 :
587 92 : LBVERB << "Added node " << getNodeID() << " using " << connection
588 138 : << std::endl;
589 46 : return true;
590 : }
591 :
592 7 : bool LocalNode::disconnect( NodePtr node )
593 : {
594 7 : if( !node || !isListening() )
595 0 : return false;
596 :
597 7 : if( !node->isConnected( ))
598 0 : return true;
599 :
600 7 : LBASSERT( !inCommandThread( ));
601 7 : lunchbox::Request< void > request = registerRequest< void >( node.get( ));
602 7 : send( CMD_NODE_DISCONNECT ) << request;
603 :
604 7 : request.wait();
605 7 : _impl->objectStore->removeNode( node );
606 7 : return true;
607 : }
608 :
609 20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
610 : {
611 20001 : if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
612 20001 : return;
613 :
614 20001 : if( node == this ) // OPT
615 0 : serveRequest( requestID );
616 : else
617 20001 : node->send( CMD_NODE_ACK_REQUEST ) << requestID;
618 : }
619 :
620 0 : void LocalNode::ping( NodePtr peer )
621 : {
622 0 : LBASSERT( !_impl->inReceiverThread( ));
623 0 : peer->send( CMD_NODE_PING );
624 0 : }
625 :
626 0 : bool LocalNode::pingIdleNodes()
627 : {
628 0 : LBASSERT( !_impl->inReceiverThread( ) );
629 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
630 0 : Nodes nodes;
631 0 : getNodes( nodes, false );
632 :
633 0 : bool pinged = false;
634 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
635 : {
636 0 : NodePtr node = *i;
637 0 : if( getTime64() - node->getLastReceiveTime() > timeout )
638 : {
639 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
640 0 : << node->getLastReceiveTime() << std::endl;
641 0 : node->send( CMD_NODE_PING );
642 0 : pinged = true;
643 : }
644 0 : }
645 0 : return pinged;
646 : }
647 :
648 : //----------------------------------------------------------------------
649 : // Object functionality
650 : //----------------------------------------------------------------------
651 0 : void LocalNode::disableInstanceCache()
652 : {
653 0 : _impl->objectStore->disableInstanceCache();
654 0 : }
655 :
656 0 : void LocalNode::expireInstanceData( const int64_t age )
657 : {
658 0 : _impl->objectStore->expireInstanceData( age );
659 0 : }
660 :
661 0 : void LocalNode::enableSendOnRegister()
662 : {
663 0 : _impl->objectStore->enableSendOnRegister();
664 0 : }
665 :
666 0 : void LocalNode::disableSendOnRegister()
667 : {
668 0 : _impl->objectStore->disableSendOnRegister();
669 0 : }
670 :
671 20 : bool LocalNode::registerObject( Object* object )
672 : {
673 20 : return _impl->objectStore->register_( object );
674 : }
675 :
676 20 : void LocalNode::deregisterObject( Object* object )
677 : {
678 20 : _impl->objectStore->deregister( object );
679 20 : }
680 :
681 39 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
682 : NodePtr master, const uint128_t& version )
683 : {
684 : const uint32_t request = _impl->objectStore->mapNB( object, id, version,
685 39 : master );
686 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
687 39 : _impl->objectStore, request );
688 39 : return f_bool_t( new FuturebImpl( func ));
689 : }
690 :
691 0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
692 : const uint128_t& version )
693 : {
694 0 : return _impl->objectStore->mapNB( object, id, version, 0 );
695 : }
696 :
697 2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
698 : const uint128_t& version, NodePtr master )
699 : {
700 2 : return _impl->objectStore->mapNB( object, id, version, master );
701 : }
702 :
703 :
704 2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
705 : {
706 2 : return _impl->objectStore->mapSync( requestID );
707 : }
708 :
709 4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
710 : const uint32_t instanceID )
711 : {
712 4 : return _impl->objectStore->sync( object, master, id, instanceID );
713 : }
714 :
715 41 : void LocalNode::unmapObject( Object* object )
716 : {
717 41 : _impl->objectStore->unmap( object );
718 41 : }
719 :
720 0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
721 : {
722 0 : _impl->objectStore->swap( oldObject, newObject );
723 0 : }
724 :
725 0 : void LocalNode::objectPush( const uint128_t& groupID,
726 : const uint128_t& objectType,
727 : const uint128_t& objectID, DataIStream& istream )
728 : {
729 0 : lunchbox::ScopedRead mutex( _impl->pushHandlers );
730 0 : HandlerHashCIter i = _impl->pushHandlers->find( groupID );
731 0 : if( i != _impl->pushHandlers->end( ))
732 0 : i->second( groupID, objectType, objectID, istream );
733 : else
734 0 : LBWARN << "No custom handler for push group " << groupID
735 0 : << " registered" << std::endl;
736 :
737 0 : if( istream.wasUsed() && istream.hasData( ))
738 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
739 0 : << objectType << " object " << objectID << std::endl;
740 0 : }
741 :
742 0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
743 : const PushHandler& handler )
744 : {
745 0 : lunchbox::ScopedWrite mutex( _impl->pushHandlers );
746 0 : (*_impl->pushHandlers)[ groupID ] = handler;
747 0 : }
748 :
749 2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
750 : const CommandHandler& func,
751 : CommandQueue* queue )
752 : {
753 2 : lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
754 2 : if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
755 : {
756 0 : LBWARN << "Already got a registered handler for custom command "
757 0 : << command << std::endl;
758 0 : return false;
759 : }
760 :
761 : _impl->commandHandlers->insert( std::make_pair( command,
762 2 : std::make_pair( func, queue )));
763 2 : return true;
764 : }
765 :
766 0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
767 : {
768 0 : LBASSERT( !inCommandThread( ));
769 0 : LBASSERT( !_impl->inReceiverThread( ));
770 :
771 0 : lunchbox::Request< void > request = registerRequest< void >();
772 0 : node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
773 :
774 : try
775 : {
776 0 : request.wait( Global::getTimeout( ));
777 : }
778 0 : catch( const lunchbox::FutureTimeout& )
779 : {
780 0 : LBERROR << "Timeout while acquiring send token " << request.getID()
781 0 : << std::endl;
782 0 : request.unregister();
783 0 : return 0;
784 : }
785 0 : return new co::SendToken( node );
786 : }
787 :
788 0 : void LocalNode::releaseSendToken( SendToken token )
789 : {
790 0 : LBASSERT( !_impl->inReceiverThread( ));
791 0 : if( token )
792 0 : token->release();
793 0 : }
794 :
795 : //----------------------------------------------------------------------
796 : // Connecting a node
797 : //----------------------------------------------------------------------
798 : namespace
799 : {
800 : enum ConnectResult
801 : {
802 : CONNECT_OK,
803 : CONNECT_TRY_AGAIN,
804 : CONNECT_BAD_STATE,
805 : CONNECT_TIMEOUT,
806 : CONNECT_UNREACHABLE
807 : };
808 : }
809 :
810 72 : NodePtr LocalNode::connect( const NodeID& nodeID )
811 : {
812 72 : LBASSERT( nodeID != 0 );
813 73 : LBASSERT( isListening( ));
814 :
815 : // Make sure that only one connection request based on the node identifier
816 : // is pending at a given time. Otherwise a node with the same id might be
817 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
818 : // mutex is to register connecting nodes with this local node, and handle
819 : // all cases correctly, which is far more complex. Node connections only
820 : // happen a lot during initialization, and are therefore not time-critical.
821 73 : lunchbox::ScopedWrite mutex( _impl->connectLock );
822 :
823 146 : Nodes nodes;
824 73 : getNodes( nodes );
825 :
826 76 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
827 : {
828 76 : NodePtr peer = *i;
829 76 : if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
830 73 : return peer;
831 3 : }
832 :
833 0 : LBDEBUG << "Connecting node " << nodeID << std::endl;
834 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
835 : {
836 0 : NodePtr peer = *i;
837 0 : NodePtr node = _connect( nodeID, peer );
838 0 : if( node )
839 0 : return node;
840 0 : }
841 :
842 0 : NodePtr node = _connectFromZeroconf( nodeID );
843 0 : if( node )
844 0 : return node;
845 :
846 : // check again if node connected by itself by now
847 0 : nodes.clear();
848 0 : getNodes( nodes );
849 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
850 : {
851 0 : node = *i;
852 0 : if( node->getNodeID() == nodeID && node->isReachable( ))
853 0 : return node;
854 : }
855 :
856 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
857 73 : return 0;
858 : }
859 :
860 0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
861 : {
862 0 : LBASSERT( nodeID != 0 );
863 :
864 0 : NodePtr node;
865 : {
866 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
867 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
868 0 : if( i != _impl->nodes->end( ))
869 0 : node = i->second;
870 : }
871 :
872 0 : LBASSERT( getNodeID() != nodeID );
873 0 : if( !node )
874 : {
875 0 : lunchbox::Request< void* > request = registerRequest< void* >();
876 0 : peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
877 0 : node = reinterpret_cast< Node* >( request.wait( ));
878 0 : if( !node )
879 : {
880 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
881 0 : << std::endl;
882 0 : return 0;
883 : }
884 0 : node->unref( this ); // ref'd before serveRequest()
885 : }
886 :
887 0 : if( node->isReachable( ))
888 0 : return node;
889 :
890 0 : size_t tries = 10;
891 0 : while( --tries )
892 : {
893 0 : switch( _connect( node ))
894 : {
895 : case CONNECT_OK:
896 0 : return node;
897 : case CONNECT_TRY_AGAIN:
898 : {
899 0 : lunchbox::RNG rng;
900 : // collision avoidance
901 0 : lunchbox::sleep( rng.get< uint8_t >( ));
902 0 : break;
903 : }
904 : case CONNECT_BAD_STATE:
905 0 : LBWARN << "Internal connect error" << std::endl;
906 : // no break;
907 : case CONNECT_TIMEOUT:
908 0 : return 0;
909 :
910 : case CONNECT_UNREACHABLE:
911 0 : break; // maybe peer talks to us
912 : }
913 :
914 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
915 : // connect failed - check for simultaneous connect from peer
916 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
917 0 : if( i != _impl->nodes->end( ))
918 0 : node = i->second;
919 0 : }
920 :
921 0 : return node->isReachable() ? node : 0;
922 : }
923 :
924 0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
925 : {
926 0 : lunchbox::ScopedWrite mutex( _impl->service );
927 :
928 : const Strings& instances =
929 0 : _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
930 0 : for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
931 : {
932 0 : const std::string& instance = *i;
933 0 : const NodeID candidate( instance );
934 0 : if( candidate != nodeID )
935 0 : continue;
936 :
937 0 : const std::string& typeStr = _impl->service->get( instance, "co_type" );
938 0 : if( typeStr.empty( ))
939 0 : return 0;
940 :
941 0 : std::istringstream in( typeStr );
942 0 : uint32_t type = 0;
943 0 : in >> type;
944 :
945 0 : NodePtr node = createNode( type );
946 0 : if( !node )
947 : {
948 0 : LBINFO << "Can't create node of type " << type << std::endl;
949 0 : continue;
950 : }
951 :
952 : const std::string& numStr = _impl->service->get( instance,
953 0 : "co_numPorts" );
954 0 : uint32_t num = 0;
955 :
956 0 : in.clear();
957 0 : in.str( numStr );
958 0 : in >> num;
959 0 : LBASSERT( num > 0 );
960 0 : for( size_t j = 0; j < num; ++j )
961 : {
962 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
963 0 : std::ostringstream out;
964 0 : out << "co_port" << j;
965 :
966 0 : std::string descStr = _impl->service->get( instance, out.str( ));
967 0 : LBASSERT( !descStr.empty( ));
968 0 : LBCHECK( desc->fromString( descStr ));
969 0 : LBASSERT( descStr.empty( ));
970 0 : node->addConnectionDescription( desc );
971 0 : }
972 0 : mutex.leave();
973 0 : if( _connect( node ))
974 0 : return node;
975 0 : }
976 0 : return 0;
977 : }
978 :
979 33 : bool LocalNode::connect( NodePtr node )
980 : {
981 33 : lunchbox::ScopedWrite mutex( _impl->connectLock );
982 33 : return ( _connect( node ) == CONNECT_OK );
983 : }
984 :
985 33 : uint32_t LocalNode::_connect( NodePtr node )
986 : {
987 33 : LBASSERTINFO( isListening(), *this );
988 33 : if( node->isReachable( ))
989 0 : return CONNECT_OK;
990 :
991 33 : LBASSERT( node->isClosed( ));
992 33 : LBDEBUG << "Connecting " << node << std::endl;
993 :
994 : // try connecting using the given descriptions
995 33 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
996 99 : for( ConnectionDescriptionsCIter i = cds.begin();
997 66 : i != cds.end(); ++i )
998 : {
999 33 : ConnectionDescriptionPtr description = *i;
1000 33 : if( description->type >= CONNECTIONTYPE_MULTICAST )
1001 0 : continue; // Don't use multicast for primary connections
1002 :
1003 33 : ConnectionPtr connection = Connection::create( description );
1004 33 : if( !connection || !connection->connect( ))
1005 0 : continue;
1006 :
1007 33 : return _connect( node, connection );
1008 0 : }
1009 :
1010 0 : LBDEBUG << "Node " << node
1011 0 : << " unreachable, all connections failed to connect" <<std::endl;
1012 0 : return CONNECT_UNREACHABLE;
1013 : }
1014 :
1015 0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
1016 : {
1017 0 : return ( _connect( node, connection ) == CONNECT_OK );
1018 : }
1019 :
1020 33 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
1021 : {
1022 33 : LBASSERT( connection );
1023 33 : LBASSERT( node->getNodeID() != getNodeID( ));
1024 :
1025 66 : if( !node || !isListening() || !connection->isConnected() ||
1026 33 : !node->isClosed( ))
1027 : {
1028 0 : return CONNECT_BAD_STATE;
1029 : }
1030 :
1031 33 : _addConnection( connection );
1032 :
1033 : // send connect command to peer
1034 33 : lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
1035 : #ifdef COMMON_BIGENDIAN
1036 : uint32_t cmd = CMD_NODE_CONNECT_BE;
1037 : lunchbox::byteswap( cmd );
1038 : #else
1039 33 : const uint32_t cmd = CMD_NODE_CONNECT;
1040 : #endif
1041 : OCommand( Connections( 1, connection ), cmd )
1042 33 : << getNodeID() << request << getType() << serialize();
1043 :
1044 33 : bool connected = false;
1045 : try
1046 : {
1047 33 : connected = request.wait( 10000 /*ms*/ );
1048 : }
1049 0 : catch( const lunchbox::FutureTimeout& )
1050 : {
1051 0 : LBWARN << "Node connection handshake timeout - " << node
1052 0 : << " not a Collage node?" << std::endl;
1053 0 : request.unregister();
1054 0 : return CONNECT_TIMEOUT;
1055 : }
1056 :
1057 : // In simultaneous connect case, depending on the connection type
1058 : // (e.g. RDMA), a check on the connection state of the node is required
1059 33 : if( !connected || !node->isConnected( ))
1060 0 : return CONNECT_TRY_AGAIN;
1061 :
1062 33 : LBASSERT( node->getNodeID() != 0 );
1063 33 : LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
1064 33 : LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
1065 33 : return CONNECT_OK;
1066 : }
1067 :
1068 39 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
1069 : {
1070 39 : LBASSERTINFO( id.isUUID(), id );
1071 39 : if( !id.isUUID( ))
1072 : {
1073 0 : LBWARN << "Invalid object id " << id << std::endl;
1074 0 : return 0;
1075 : }
1076 :
1077 39 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
1078 39 : if( masterNodeID == 0 )
1079 : {
1080 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1081 0 : return 0;
1082 : }
1083 :
1084 39 : NodePtr master = connect( masterNodeID );
1085 39 : if( master && !master->isClosed( ))
1086 39 : return master;
1087 :
1088 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1089 0 : << " for object " << id << std::endl;
1090 0 : return 0;
1091 : }
1092 :
1093 33 : NodePtr LocalNode::createNode( const uint32_t type )
1094 : {
1095 33 : LBASSERTINFO( type == NODETYPE_NODE, type );
1096 33 : return new Node( type );
1097 : }
1098 :
1099 0 : NodePtr LocalNode::getNode( const NodeID& id ) const
1100 : {
1101 0 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1102 0 : NodeHash::const_iterator i = _impl->nodes->find( id );
1103 0 : if( i == _impl->nodes->end() || !i->second->isReachable( ))
1104 0 : return 0;
1105 0 : return i->second;
1106 : }
1107 :
1108 112 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
1109 : {
1110 112 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1111 328 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1112 : {
1113 216 : NodePtr node = i->second;
1114 216 : if( node->isReachable() && ( addSelf || node != this ))
1115 216 : nodes.push_back( i->second );
1116 328 : }
1117 112 : }
1118 :
1119 138 : CommandQueue* LocalNode::getCommandThreadQueue()
1120 : {
1121 138 : return _impl->commandThread->getWorkerQueue();
1122 : }
1123 :
1124 7 : bool LocalNode::inCommandThread() const
1125 : {
1126 7 : return _impl->commandThread->isCurrent();
1127 : }
1128 :
1129 72410 : int64_t LocalNode::getTime64() const
1130 : {
1131 72410 : return _impl->clock.getTime64();
1132 : }
1133 :
1134 0 : ssize_t LocalNode::getCounter( const Counter counter ) const
1135 : {
1136 0 : return _impl->counters[ counter ];
1137 : }
1138 :
1139 193 : void LocalNode::flushCommands()
1140 : {
1141 193 : _impl->incoming.interrupt();
1142 193 : }
1143 :
1144 : //----------------------------------------------------------------------
1145 : // receiver thread functions
1146 : //----------------------------------------------------------------------
1147 46 : void LocalNode::_runReceiverThread()
1148 : {
1149 46 : LB_TS_THREAD( _rcvThread );
1150 46 : _initService();
1151 :
1152 46 : int nErrors = 0;
1153 72611 : while( isListening( ))
1154 : {
1155 72520 : const ConnectionSet::Event result = _impl->incoming.select();
1156 72519 : switch( result )
1157 : {
1158 : case ConnectionSet::EVENT_CONNECT:
1159 33 : _handleConnect();
1160 33 : break;
1161 :
1162 : case ConnectionSet::EVENT_DATA:
1163 72157 : _handleData();
1164 72157 : break;
1165 :
1166 : case ConnectionSet::EVENT_DISCONNECT:
1167 : case ConnectionSet::EVENT_INVALID_HANDLE:
1168 33 : _handleDisconnect();
1169 33 : break;
1170 :
1171 : case ConnectionSet::EVENT_TIMEOUT:
1172 0 : LBINFO << "select timeout" << std::endl;
1173 0 : break;
1174 :
1175 : case ConnectionSet::EVENT_ERROR:
1176 0 : ++nErrors;
1177 0 : LBWARN << "Connection error during select" << std::endl;
1178 0 : if( nErrors > 100 )
1179 : {
1180 0 : LBWARN << "Too many errors in a row, capping connection"
1181 0 : << std::endl;
1182 0 : _handleDisconnect();
1183 : }
1184 0 : break;
1185 :
1186 : case ConnectionSet::EVENT_SELECT_ERROR:
1187 0 : LBWARN << "Error during select" << std::endl;
1188 0 : ++nErrors;
1189 0 : if( nErrors > 10 )
1190 : {
1191 0 : LBWARN << "Too many errors in a row" << std::endl;
1192 0 : LBUNIMPLEMENTED;
1193 : }
1194 0 : break;
1195 :
1196 : case ConnectionSet::EVENT_INTERRUPT:
1197 296 : _redispatchCommands();
1198 296 : break;
1199 :
1200 : default:
1201 0 : LBUNIMPLEMENTED;
1202 : }
1203 72519 : if( result != ConnectionSet::EVENT_ERROR &&
1204 : result != ConnectionSet::EVENT_SELECT_ERROR )
1205 : {
1206 72519 : nErrors = 0;
1207 : }
1208 : }
1209 :
1210 45 : if( !_impl->pendingCommands.empty( ))
1211 0 : LBWARN << _impl->pendingCommands.size()
1212 0 : << " commands pending while leaving command thread" << std::endl;
1213 :
1214 45 : _impl->pendingCommands.clear();
1215 45 : LBCHECK( _impl->commandThread->join( ));
1216 :
1217 45 : ConnectionPtr connection = getConnection();
1218 90 : PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
1219 45 : connection = pipe->getSibling();
1220 45 : _removeConnection( connection );
1221 45 : _impl->connectionNodes.erase( connection );
1222 45 : _disconnect();
1223 :
1224 45 : const Connections& connections = _impl->incoming.getConnections();
1225 160 : while( !connections.empty( ))
1226 : {
1227 70 : connection = connections.back();
1228 70 : NodePtr node = _impl->connectionNodes[ connection ];
1229 :
1230 70 : if( node )
1231 70 : _closeNode( node );
1232 70 : _removeConnection( connection );
1233 70 : }
1234 :
1235 45 : _impl->objectStore->clear();
1236 45 : _impl->pendingCommands.clear();
1237 45 : _impl->smallBuffers.flush();
1238 45 : _impl->bigBuffers.flush();
1239 :
1240 225 : LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
1241 225 : << std::endl;
1242 45 : }
1243 :
1244 33 : void LocalNode::_handleConnect()
1245 : {
1246 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1247 66 : ConnectionPtr newConn = connection->acceptSync();
1248 33 : connection->acceptNB();
1249 :
1250 33 : if( newConn )
1251 33 : _addConnection( newConn );
1252 : else
1253 33 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1254 33 : }
1255 :
1256 33 : void LocalNode::_handleDisconnect()
1257 : {
1258 33 : while( _handleData( )) ; // read remaining data off connection
1259 :
1260 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1261 33 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
1262 :
1263 33 : if( i != _impl->connectionNodes.end( ))
1264 : {
1265 33 : NodePtr node = i->second;
1266 :
1267 33 : node->ref(); // extend lifetime to give cmd handler a chance
1268 :
1269 : // local command dispatching
1270 : OCommand( this, this, CMD_NODE_REMOVE_NODE )
1271 33 : << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
1272 :
1273 33 : if( node->getConnection() == connection )
1274 33 : _closeNode( node );
1275 0 : else if( connection->isMulticast( ))
1276 0 : node->_removeMulticast( connection );
1277 : }
1278 :
1279 33 : _removeConnection( connection );
1280 33 : }
1281 :
1282 72190 : bool LocalNode::_handleData()
1283 : {
1284 72190 : _impl->smallBuffers.compact();
1285 72190 : _impl->bigBuffers.compact();
1286 :
1287 72190 : ConnectionPtr connection = _impl->incoming.getConnection();
1288 72190 : LBASSERT( connection );
1289 :
1290 144380 : BufferPtr buffer = _readHead( connection );
1291 72190 : if( !buffer ) // fluke signal
1292 66 : return false;
1293 :
1294 144248 : ICommand command = _setupCommand( connection, buffer );
1295 72124 : const bool gotCommand = _readTail( command, buffer, connection );
1296 72124 : LBASSERT( gotCommand );
1297 :
1298 : // start next receive
1299 144248 : BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1300 72124 : connection->recvNB( nextBuffer, COMMAND_MINSIZE );
1301 :
1302 72124 : if( gotCommand )
1303 : {
1304 72124 : _dispatchCommand( command );
1305 72124 : return true;
1306 : }
1307 :
1308 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1309 72190 : return false;
1310 : }
1311 :
1312 72190 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
1313 : {
1314 72190 : BufferPtr buffer;
1315 72190 : const bool gotSize = connection->recvSync( buffer, false );
1316 :
1317 72190 : if( !buffer ) // fluke signal
1318 : {
1319 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1320 0 : << std::endl;
1321 0 : _impl->incoming.setDirty();
1322 0 : return 0;
1323 : }
1324 :
1325 72190 : if( gotSize )
1326 72124 : return buffer;
1327 :
1328 : // Some systems signal data on dead connections.
1329 66 : buffer->setSize( 0 );
1330 66 : connection->recvNB( buffer, COMMAND_MINSIZE );
1331 66 : return 0;
1332 : }
1333 :
1334 72124 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
1335 : ConstBufferPtr buffer )
1336 : {
1337 72124 : NodePtr node;
1338 72124 : ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
1339 72124 : if( i != _impl->connectionNodes.end( ))
1340 72058 : node = i->second;
1341 72124 : LBVERB << "Handle data from " << node << std::endl;
1342 :
1343 : #ifdef COMMON_BIGENDIAN
1344 : const bool swapping = node ? !node->isBigEndian() : false;
1345 : #else
1346 72124 : const bool swapping = node ? node->isBigEndian() : false;
1347 : #endif
1348 144248 : ICommand command( this, node, buffer, swapping );
1349 :
1350 72124 : if( node )
1351 : {
1352 72058 : node->_setLastReceive( getTime64( ));
1353 72058 : return command;
1354 : }
1355 :
1356 66 : uint32_t cmd = command.getCommand();
1357 : #ifdef COMMON_BIGENDIAN
1358 : lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
1359 : #endif
1360 66 : switch( cmd )
1361 : {
1362 : case CMD_NODE_CONNECT:
1363 : case CMD_NODE_CONNECT_REPLY:
1364 : case CMD_NODE_ID:
1365 : #ifdef COMMON_BIGENDIAN
1366 : command = ICommand( this, node, buffer, true );
1367 : #endif
1368 66 : break;
1369 :
1370 : case CMD_NODE_CONNECT_BE:
1371 : case CMD_NODE_CONNECT_REPLY_BE:
1372 : case CMD_NODE_ID_BE:
1373 : #ifndef COMMON_BIGENDIAN
1374 0 : command = ICommand( this, node, buffer, true );
1375 : #endif
1376 0 : break;
1377 :
1378 : default:
1379 0 : LBUNIMPLEMENTED;
1380 0 : return ICommand();
1381 : }
1382 :
1383 66 : command.setCommand( cmd ); // reset correctly swapped version
1384 72190 : return command;
1385 : }
1386 :
1387 72124 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
1388 : ConnectionPtr connection )
1389 : {
1390 72124 : const uint64_t needed = command.getSize();
1391 72124 : if( needed <= buffer->getSize( ))
1392 62113 : return true;
1393 :
1394 10011 : if( needed > buffer->getMaxSize( ))
1395 : {
1396 0 : LBASSERT( needed > COMMAND_ALLOCSIZE );
1397 0 : LBASSERTINFO( needed < LB_BIT48,
1398 : "Out-of-sync network stream: " << command << "?" );
1399 : // not enough space for remaining data, alloc and copy to new buffer
1400 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
1401 0 : newBuffer->replace( *buffer );
1402 0 : buffer = newBuffer;
1403 :
1404 0 : command = ICommand( this, command.getRemoteNode(), buffer,
1405 0 : command.isSwapping( ));
1406 : }
1407 :
1408 : // read remaining data
1409 10011 : connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
1410 10011 : return connection->recvSync( buffer );
1411 : }
1412 :
1413 34 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
1414 : {
1415 34 : LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
1416 : BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
1417 : _impl->bigBuffers.alloc( size ) :
1418 34 : _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1419 34 : return buffer;
1420 : }
1421 :
1422 72169 : void LocalNode::_dispatchCommand( ICommand& command )
1423 : {
1424 72169 : LBASSERTINFO( command.isValid(), command );
1425 :
1426 72169 : if( dispatchCommand( command ))
1427 72169 : _redispatchCommands();
1428 : else
1429 : {
1430 0 : _redispatchCommands();
1431 0 : _impl->pendingCommands.push_back( command );
1432 : }
1433 72169 : }
1434 :
1435 72205 : bool LocalNode::dispatchCommand( ICommand& command )
1436 : {
1437 72205 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1438 72205 : LBASSERTINFO( command.isValid(), command );
1439 :
1440 72205 : const uint32_t type = command.getType();
1441 72205 : switch( type )
1442 : {
1443 : case COMMANDTYPE_NODE:
1444 71697 : LBCHECK( Dispatcher::dispatchCommand( command ));
1445 71697 : return true;
1446 :
1447 : case COMMANDTYPE_OBJECT:
1448 508 : return _impl->objectStore->dispatchObjectCommand( command );
1449 :
1450 : default:
1451 0 : LBABORT( "Unknown command type " << type << " for " << command );
1452 0 : return true;
1453 : }
1454 : }
1455 :
1456 72465 : void LocalNode::_redispatchCommands()
1457 : {
1458 72465 : bool changes = true;
1459 144930 : while( changes && !_impl->pendingCommands.empty( ))
1460 : {
1461 0 : changes = false;
1462 :
1463 0 : for( CommandList::iterator i = _impl->pendingCommands.begin();
1464 0 : i != _impl->pendingCommands.end(); ++i )
1465 : {
1466 0 : ICommand& command = *i;
1467 0 : LBASSERT( command.isValid( ));
1468 :
1469 0 : if( dispatchCommand( command ))
1470 : {
1471 0 : _impl->pendingCommands.erase( i );
1472 0 : changes = true;
1473 0 : break;
1474 : }
1475 : }
1476 : }
1477 :
1478 : #ifndef NDEBUG
1479 72465 : if( !_impl->pendingCommands.empty( ))
1480 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1481 0 : << std::endl;
1482 72465 : LBASSERT( _impl->pendingCommands.size() < 200 );
1483 : #endif
1484 72465 : }
1485 :
1486 46 : void LocalNode::_initService()
1487 : {
1488 46 : LB_TS_SCOPED( _rcvThread );
1489 46 : _impl->service->withdraw(); // go silent during k/v update
1490 :
1491 90 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1492 46 : if( descs.empty( ))
1493 48 : return;
1494 :
1495 88 : std::ostringstream out;
1496 44 : out << getType();
1497 44 : _impl->service->set( "co_type", out.str( ));
1498 :
1499 44 : out.str("");
1500 44 : out << descs.size();
1501 44 : _impl->service->set( "co_numPorts", out.str( ));
1502 :
1503 88 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
1504 : {
1505 44 : ConnectionDescriptionPtr desc = *i;
1506 44 : out.str("");
1507 44 : out << "co_port" << i - descs.begin();
1508 44 : _impl->service->set( out.str(), desc->toString( ));
1509 44 : }
1510 :
1511 88 : _impl->service->announce( descs.front()->port, getNodeID().getString( ));
1512 : }
1513 :
1514 45 : void LocalNode::_exitService()
1515 : {
1516 45 : _impl->service->withdraw();
1517 45 : }
1518 :
1519 0 : Zeroconf LocalNode::getZeroconf()
1520 : {
1521 0 : lunchbox::ScopedWrite mutex( _impl->service );
1522 0 : _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
1523 0 : return Zeroconf( _impl->service.data );
1524 : }
1525 :
1526 :
1527 : //----------------------------------------------------------------------
1528 : // command thread functions
1529 : //----------------------------------------------------------------------
1530 46 : bool LocalNode::_startCommandThread( const int32_t threadID )
1531 : {
1532 46 : _impl->commandThread->threadID = threadID;
1533 46 : return _impl->commandThread->start();
1534 : }
1535 :
1536 51173 : bool LocalNode::_notifyCommandThreadIdle()
1537 : {
1538 51173 : return _impl->objectStore->notifyCommandThreadIdle();
1539 : }
1540 :
1541 20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
1542 : {
1543 20001 : const uint32_t requestID = command.get< uint32_t >();
1544 20001 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
1545 :
1546 20001 : serveRequest( requestID );
1547 20001 : return true;
1548 : }
1549 :
1550 45 : bool LocalNode::_cmdStopRcv( ICommand& command )
1551 : {
1552 45 : LB_TS_THREAD( _rcvThread );
1553 45 : LBASSERT( isListening( ));
1554 :
1555 45 : _exitService();
1556 45 : _setClosing(); // causes rcv thread exit
1557 :
1558 45 : command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
1559 45 : _dispatchCommand( command );
1560 45 : return true;
1561 : }
1562 :
1563 45 : bool LocalNode::_cmdStopCmd( ICommand& )
1564 : {
1565 45 : LB_TS_THREAD( _cmdThread );
1566 45 : LBASSERTINFO( isClosing(), *this );
1567 :
1568 45 : _setClosed();
1569 45 : return true;
1570 : }
1571 :
1572 0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
1573 : {
1574 0 : const int32_t affinity = command.get< int32_t >();
1575 :
1576 0 : lunchbox::Thread::setAffinity( affinity );
1577 0 : return true;
1578 : }
1579 :
1580 33 : bool LocalNode::_cmdConnect( ICommand& command )
1581 : {
1582 33 : LBASSERTINFO( !command.getRemoteNode(), command );
1583 33 : LBASSERT( _impl->inReceiverThread( ));
1584 :
1585 33 : const NodeID& nodeID = command.get< NodeID >();
1586 33 : const uint32_t requestID = command.get< uint32_t >();
1587 33 : const uint32_t nodeType = command.get< uint32_t >();
1588 33 : std::string data = command.get< std::string >();
1589 :
1590 33 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1591 33 : << nodeType << " data " << data << std::endl;
1592 :
1593 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1594 :
1595 33 : LBASSERT( connection );
1596 33 : LBASSERT( nodeID != getNodeID() );
1597 33 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1598 : _impl->connectionNodes.end( ));
1599 :
1600 66 : NodePtr peer;
1601 : #ifdef COMMON_BIGENDIAN
1602 : uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
1603 : lunchbox::byteswap( cmd );
1604 : #else
1605 33 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1606 : #endif
1607 :
1608 : // No locking needed, only recv thread modifies
1609 33 : NodeHashCIter i = _impl->nodes->find( nodeID );
1610 33 : if( i != _impl->nodes->end( ))
1611 : {
1612 0 : peer = i->second;
1613 0 : if( peer->isReachable( ))
1614 : {
1615 : // Node exists, probably simultaneous connect from peer
1616 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1617 0 : << std::endl;
1618 :
1619 : // refuse connection
1620 : OCommand( Connections( 1, connection ), cmd )
1621 0 : << NodeID() << requestID;
1622 :
1623 : // NOTE: There is no close() here. The reply command above has to be
1624 : // received by the peer first, before closing the connection.
1625 0 : _removeConnection( connection );
1626 0 : return true;
1627 : }
1628 : }
1629 :
1630 : // create and add connected node
1631 33 : if( !peer )
1632 33 : peer = createNode( nodeType );
1633 33 : if( !peer )
1634 : {
1635 0 : LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
1636 0 : << std::endl;
1637 :
1638 : // refuse connection
1639 0 : OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
1640 :
1641 : // NOTE: There is no close() here. The reply command above has to be
1642 : // received by the peer first, before closing the connection.
1643 0 : _removeConnection( connection );
1644 0 : return true;
1645 : }
1646 :
1647 33 : if( !peer->deserialize( data ))
1648 0 : LBWARN << "Error during node initialization" << std::endl;
1649 33 : LBASSERTINFO( data.empty(), data );
1650 33 : LBASSERTINFO( peer->getNodeID() == nodeID,
1651 : peer->getNodeID() << "!=" << nodeID );
1652 33 : LBASSERT( peer->getType() == nodeType );
1653 :
1654 33 : _impl->connectionNodes[ connection ] = peer;
1655 : {
1656 33 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1657 33 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1658 : }
1659 33 : LBVERB << "Added node " << nodeID << std::endl;
1660 :
1661 : // send our information as reply
1662 : OCommand( Connections( 1, connection ), cmd )
1663 33 : << getNodeID() << requestID << getType() << serialize();
1664 :
1665 66 : return true;
1666 : }
1667 :
1668 33 : bool LocalNode::_cmdConnectReply( ICommand& command )
1669 : {
1670 33 : LBASSERT( !command.getRemoteNode( ));
1671 33 : LBASSERT( _impl->inReceiverThread( ));
1672 :
1673 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1674 33 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1675 : _impl->connectionNodes.end( ));
1676 :
1677 33 : const NodeID& nodeID = command.get< NodeID >();
1678 33 : const uint32_t requestID = command.get< uint32_t >();
1679 :
1680 : // connection refused
1681 33 : if( nodeID == 0 )
1682 : {
1683 0 : LBINFO << "Connection refused, node already connected by peer"
1684 0 : << std::endl;
1685 :
1686 0 : _removeConnection( connection );
1687 0 : serveRequest( requestID, false );
1688 0 : return true;
1689 : }
1690 :
1691 33 : const uint32_t nodeType = command.get< uint32_t >();
1692 66 : std::string data = command.get< std::string >();
1693 :
1694 33 : LBVERB << "handle connect reply " << command << " req " << requestID
1695 33 : << " type " << nodeType << " data " << data << std::endl;
1696 :
1697 : // No locking needed, only recv thread modifies
1698 33 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1699 66 : NodePtr peer;
1700 33 : if( i != _impl->nodes->end( ))
1701 0 : peer = i->second;
1702 :
1703 33 : if( peer && peer->isReachable( )) // simultaneous connect
1704 : {
1705 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1706 0 : << connection << std::endl;
1707 :
1708 0 : _removeConnection( connection );
1709 0 : _closeNode( peer );
1710 0 : serveRequest( requestID, false );
1711 0 : return true;
1712 : }
1713 :
1714 : // create and add node
1715 33 : if( !peer )
1716 : {
1717 33 : if( requestID != LB_UNDEFINED_UINT32 )
1718 33 : peer = reinterpret_cast< Node* >( getRequestData( requestID ));
1719 : else
1720 0 : peer = createNode( nodeType );
1721 : }
1722 33 : if( !peer )
1723 : {
1724 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1725 0 : << std::endl;
1726 0 : _removeConnection( connection );
1727 0 : return true;
1728 : }
1729 :
1730 33 : LBASSERTINFO( peer->getType() == nodeType,
1731 : peer->getType() << " != " << nodeType );
1732 33 : LBASSERT( peer->isClosed( ));
1733 :
1734 33 : if( !peer->deserialize( data ))
1735 0 : LBWARN << "Error during node initialization" << std::endl;
1736 33 : LBASSERT( data.empty( ));
1737 33 : LBASSERT( peer->getNodeID() == nodeID );
1738 :
1739 : // send ACK to peer
1740 : // cppcheck-suppress unusedScopedObject
1741 33 : OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
1742 :
1743 33 : peer->_connect( connection );
1744 33 : _impl->connectionNodes[ connection ] = peer;
1745 : {
1746 33 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1747 33 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1748 : }
1749 33 : _connectMulticast( peer );
1750 33 : LBVERB << "Added node " << nodeID << std::endl;
1751 :
1752 33 : serveRequest( requestID, true );
1753 :
1754 33 : notifyConnect( peer );
1755 66 : return true;
1756 : }
1757 :
1758 33 : bool LocalNode::_cmdConnectAck( ICommand& command )
1759 : {
1760 33 : NodePtr node = command.getRemoteNode();
1761 33 : LBASSERT( node );
1762 33 : LBASSERT( _impl->inReceiverThread( ));
1763 33 : LBVERB << "handle connect ack" << std::endl;
1764 :
1765 33 : node->_connect( _impl->incoming.getConnection( ));
1766 33 : _connectMulticast( node );
1767 33 : notifyConnect( node );
1768 33 : return true;
1769 : }
1770 :
1771 0 : bool LocalNode::_cmdID( ICommand& command )
1772 : {
1773 0 : LBASSERT( _impl->inReceiverThread( ));
1774 :
1775 0 : const NodeID& nodeID = command.get< NodeID >();
1776 0 : uint32_t nodeType = command.get< uint32_t >();
1777 0 : std::string data = command.get< std::string >();
1778 :
1779 0 : if( command.getRemoteNode( ))
1780 : {
1781 0 : LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
1782 0 : LBASSERT( command.getRemoteNode()->_getMulticast( ));
1783 0 : return true;
1784 : }
1785 :
1786 0 : LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
1787 :
1788 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1789 0 : LBASSERT( connection->isMulticast( ));
1790 0 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1791 : _impl->connectionNodes.end( ));
1792 :
1793 0 : NodePtr node;
1794 0 : if( nodeID == getNodeID() ) // 'self' multicast connection
1795 0 : node = this;
1796 : else
1797 : {
1798 : // No locking needed, only recv thread writes
1799 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1800 :
1801 0 : if( i == _impl->nodes->end( ))
1802 : {
1803 : // unknown node: create and add unconnected node
1804 0 : node = createNode( nodeType );
1805 :
1806 0 : if( !node->deserialize( data ))
1807 0 : LBWARN << "Error during node initialization" << std::endl;
1808 0 : LBASSERTINFO( data.empty(), data );
1809 :
1810 : {
1811 0 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1812 0 : _impl->nodes.data[ nodeID ] = node;
1813 : }
1814 0 : LBVERB << "Added node " << nodeID << " with multicast "
1815 0 : << connection << std::endl;
1816 : }
1817 : else
1818 0 : node = i->second;
1819 : }
1820 0 : LBASSERT( node );
1821 0 : LBASSERTINFO( node->getNodeID() == nodeID,
1822 : node->getNodeID() << "!=" << nodeID );
1823 :
1824 0 : _connectMulticast( node, connection );
1825 0 : _impl->connectionNodes[ connection ] = node;
1826 0 : LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
1827 0 : << " to " << getNodeID() << std::endl;
1828 0 : return true;
1829 : }
1830 :
1831 7 : bool LocalNode::_cmdDisconnect( ICommand& command )
1832 : {
1833 7 : LBASSERT( _impl->inReceiverThread( ));
1834 :
1835 7 : const uint32_t requestID = command.get< uint32_t >();
1836 :
1837 7 : NodePtr node = static_cast<Node*>( getRequestData( requestID ));
1838 7 : LBASSERT( node );
1839 :
1840 7 : _closeNode( node );
1841 7 : LBASSERT( node->isClosed( ));
1842 7 : serveRequest( requestID );
1843 7 : return true;
1844 : }
1845 :
1846 0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
1847 : {
1848 0 : const NodeID& nodeID = command.get< NodeID >();
1849 0 : const uint32_t requestID = command.get< uint32_t >();
1850 :
1851 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1852 0 : << " nodeID " << nodeID << std::endl;
1853 :
1854 0 : NodePtr node = getNode( nodeID );
1855 0 : NodePtr toNode = command.getRemoteNode();
1856 :
1857 0 : uint32_t nodeType = NODETYPE_INVALID;
1858 0 : std::string nodeData;
1859 0 : if( node )
1860 : {
1861 0 : nodeType = node->getType();
1862 0 : nodeData = node->serialize();
1863 0 : LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
1864 0 : << toNode << std::endl;
1865 : }
1866 :
1867 : toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
1868 0 : << nodeID << requestID << nodeType << nodeData;
1869 0 : return true;
1870 : }
1871 :
1872 0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
1873 : {
1874 0 : LBASSERT( _impl->inReceiverThread( ));
1875 :
1876 0 : const NodeID& nodeID = command.get< NodeID >();
1877 0 : const uint32_t requestID = command.get< uint32_t >();
1878 0 : const uint32_t nodeType = command.get< uint32_t >();
1879 0 : std::string nodeData = command.get< std::string >();
1880 :
1881 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
1882 0 : << " type " << nodeType << " data " << nodeData << std::endl;
1883 :
1884 : // No locking needed, only recv thread writes
1885 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1886 0 : if( i != _impl->nodes->end( ))
1887 : {
1888 : // Requested node connected to us in the meantime
1889 0 : NodePtr node = i->second;
1890 :
1891 0 : node->ref( this );
1892 0 : serveRequest( requestID, node.get( ));
1893 0 : return true;
1894 : }
1895 :
1896 0 : if( nodeType == NODETYPE_INVALID )
1897 : {
1898 0 : serveRequest( requestID, (void*)0 );
1899 0 : return true;
1900 : }
1901 :
1902 : // new node: create and add unconnected node
1903 0 : NodePtr node = createNode( nodeType );
1904 0 : if( node )
1905 : {
1906 0 : LBASSERT( node );
1907 :
1908 0 : if( !node->deserialize( nodeData ))
1909 0 : LBWARN << "Failed to initialize node data" << std::endl;
1910 0 : LBASSERT( nodeData.empty( ));
1911 0 : node->ref( this );
1912 : }
1913 : else
1914 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
1915 :
1916 0 : serveRequest( requestID, node.get( ));
1917 0 : return true;
1918 : }
1919 :
1920 0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
1921 : {
1922 0 : LBASSERT( inCommandThread( ));
1923 0 : if( !_impl->sendToken ) // enqueue command if no token available
1924 : {
1925 0 : const uint32_t timeout = Global::getTimeout();
1926 0 : if( timeout == LB_TIMEOUT_INDEFINITE ||
1927 0 : ( getTime64() - _impl->lastSendToken <= timeout ))
1928 : {
1929 0 : _impl->sendTokenQueue.push_back( command );
1930 0 : return true;
1931 : }
1932 :
1933 : // timeout! - clear old requests
1934 0 : _impl->sendTokenQueue.clear();
1935 : // 'generate' new token - release is robust
1936 : }
1937 :
1938 0 : _impl->sendToken = false;
1939 :
1940 0 : const uint32_t requestID = command.get< uint32_t >();
1941 0 : command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1942 0 : << requestID;
1943 0 : return true;
1944 : }
1945 :
1946 0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
1947 : {
1948 0 : const uint32_t requestID = command.get< uint32_t >();
1949 0 : serveRequest( requestID );
1950 0 : return true;
1951 : }
1952 :
1953 0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
1954 : {
1955 0 : LBASSERT( inCommandThread( ));
1956 0 : _impl->lastSendToken = getTime64();
1957 :
1958 0 : if( _impl->sendToken )
1959 0 : return true; // double release due to timeout
1960 0 : if( _impl->sendTokenQueue.empty( ))
1961 : {
1962 0 : _impl->sendToken = true;
1963 0 : return true;
1964 : }
1965 :
1966 0 : ICommand& request = _impl->sendTokenQueue.front();
1967 :
1968 0 : const uint32_t requestID = request.get< uint32_t >();
1969 0 : request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1970 0 : << requestID;
1971 0 : _impl->sendTokenQueue.pop_front();
1972 0 : return true;
1973 : }
1974 :
1975 0 : bool LocalNode::_cmdAddListener( ICommand& command )
1976 : {
1977 : Connection* rawConnection =
1978 0 : reinterpret_cast< Connection* >( command.get< uint64_t >( ));
1979 0 : std::string data = command.get< std::string >();
1980 :
1981 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
1982 0 : command.getRemoteNode()->_addConnectionDescription( description );
1983 :
1984 0 : if( command.getRemoteNode() != this )
1985 0 : return true;
1986 :
1987 0 : ConnectionPtr connection = rawConnection;
1988 0 : connection->unref();
1989 0 : LBASSERT( connection );
1990 :
1991 0 : _impl->connectionNodes[ connection ] = this;
1992 0 : if( connection->isMulticast( ))
1993 0 : _addMulticast( this, connection );
1994 :
1995 0 : connection->acceptNB();
1996 0 : _impl->incoming.addConnection( connection );
1997 :
1998 0 : _initService(); // update zeroconf
1999 0 : return true;
2000 : }
2001 :
2002 0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
2003 : {
2004 0 : const uint32_t requestID = command.get< uint32_t >();
2005 0 : Connection* rawConnection = command.get< Connection* >();
2006 0 : std::string data = command.get< std::string >();
2007 :
2008 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2009 0 : LBCHECK(
2010 : command.getRemoteNode()->_removeConnectionDescription( description ));
2011 :
2012 0 : if( command.getRemoteNode() != this )
2013 0 : return true;
2014 :
2015 0 : _initService(); // update zeroconf
2016 :
2017 0 : ConnectionPtr connection = rawConnection;
2018 0 : connection->unref( this );
2019 0 : LBASSERT( connection );
2020 :
2021 0 : if( connection->isMulticast( ))
2022 0 : _removeMulticast( connection );
2023 :
2024 0 : _impl->incoming.removeConnection( connection );
2025 0 : LBASSERT( _impl->connectionNodes.find( connection ) !=
2026 : _impl->connectionNodes.end( ));
2027 0 : _impl->connectionNodes.erase( connection );
2028 0 : serveRequest( requestID );
2029 0 : return true;
2030 : }
2031 :
2032 0 : bool LocalNode::_cmdPing( ICommand& command )
2033 : {
2034 0 : LBASSERT( inCommandThread( ));
2035 0 : command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
2036 0 : return true;
2037 : }
2038 :
2039 2 : bool LocalNode::_cmdCommand( ICommand& command )
2040 : {
2041 2 : const uint128_t& commandID = command.get< uint128_t >();
2042 2 : CommandHandler func;
2043 : {
2044 2 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2045 2 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2046 2 : if( i == _impl->commandHandlers->end( ))
2047 0 : return false;
2048 :
2049 2 : CommandQueue* queue = i->second.second;
2050 2 : if( queue )
2051 : {
2052 : command.setDispatchFunction( CmdFunc( this,
2053 1 : &LocalNode::_cmdCommandAsync ));
2054 1 : queue->push( command );
2055 1 : return true;
2056 : }
2057 : // else
2058 :
2059 1 : func = i->second.first;
2060 : }
2061 :
2062 2 : CustomICommand customCmd( command );
2063 3 : return func( customCmd );
2064 : }
2065 :
2066 1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
2067 : {
2068 1 : const uint128_t& commandID = command.get< uint128_t >();
2069 1 : CommandHandler func;
2070 : {
2071 1 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2072 1 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2073 1 : LBASSERT( i != _impl->commandHandlers->end( ));
2074 1 : if( i == _impl->commandHandlers->end( ))
2075 0 : return true; // deregistered between dispatch and now
2076 1 : func = i->second.first;
2077 : }
2078 2 : CustomICommand customCmd( command );
2079 2 : return func( customCmd );
2080 : }
2081 :
2082 33 : bool LocalNode::_cmdAddConnection( ICommand& command )
2083 : {
2084 33 : LBASSERT( _impl->inReceiverThread( ));
2085 :
2086 33 : ConnectionPtr connection = command.get< ConnectionPtr >();
2087 33 : _addConnection( connection );
2088 33 : connection->unref(); // ref'd by _addConnection
2089 33 : return true;
2090 : }
2091 :
2092 63 : }
|