Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/futureFunction.h>
46 : #include <lunchbox/hash.h>
47 : #include <lunchbox/lockable.h>
48 : #include <lunchbox/request.h>
49 : #include <lunchbox/rng.h>
50 : #include <lunchbox/servus.h>
51 : #include <lunchbox/sleep.h>
52 :
53 : #include <boost/bind.hpp>
54 : #include <boost/date_time/posix_time/posix_time.hpp>
55 : #include <boost/lexical_cast.hpp>
56 :
57 : #include <list>
58 :
59 : namespace bp = boost::posix_time;
60 :
61 : namespace co
62 : {
63 : namespace
64 : {
65 20 : lunchbox::a_int32_t _threadIDs;
66 :
67 : typedef CommandFunc< LocalNode > CmdFunc;
68 : typedef std::list< ICommand > CommandList;
69 : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
70 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
71 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
72 : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
73 : typedef NodeHash::const_iterator NodeHashCIter;
74 : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
75 : typedef HandlerHash::const_iterator HandlerHashCIter;
76 : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
77 : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
78 : typedef CommandHash::const_iterator CommandHashCIter;
79 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
80 : }
81 :
82 : namespace detail
83 : {
84 98 : class ReceiverThread : public lunchbox::Thread
85 : {
86 : public:
87 51 : ReceiverThread( co::LocalNode* localNode ) : _localNode( localNode ) {}
88 46 : bool init() override
89 : {
90 46 : const int32_t threadID = ++_threadIDs - 1;
91 92 : setName( std::string( "Rcv" ) +
92 46 : boost::lexical_cast< std::string >( threadID ));
93 46 : return _localNode->_startCommandThread( threadID );
94 : }
95 :
96 46 : void run() override { _localNode->_runReceiverThread(); }
97 :
98 : private:
99 : co::LocalNode* const _localNode;
100 : };
101 :
102 98 : class CommandThread : public Worker
103 : {
104 : public:
105 51 : CommandThread( co::LocalNode* localNode )
106 : : Worker( Global::getCommandQueueLimit( ))
107 : , threadID( 0 )
108 51 : , _localNode( localNode )
109 51 : {}
110 :
111 : int32_t threadID;
112 :
113 : protected:
114 46 : bool init() override
115 : {
116 92 : setName( std::string( "Cmd" ) +
117 46 : boost::lexical_cast< std::string >( threadID ));
118 46 : return true;
119 : }
120 :
121 103286 : bool stopRunning() override { return _localNode->isClosed(); }
122 50597 : bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
123 :
124 : private:
125 : co::LocalNode* const _localNode;
126 : };
127 :
128 : class LocalNode
129 : {
130 : public:
131 51 : LocalNode()
132 : : smallBuffers( 200 )
133 : , bigBuffers( 20 )
134 : , sendToken( true )
135 : , lastSendToken( 0 )
136 : , objectStore( 0 )
137 : , receiverThread( 0 )
138 : , commandThread( 0 )
139 51 : , service( "_collage._tcp" )
140 51 : {}
141 :
142 49 : ~LocalNode()
143 49 : {
144 49 : LBASSERT( incoming.isEmpty( ));
145 49 : LBASSERT( connectionNodes.empty( ));
146 49 : LBASSERT( pendingCommands.empty( ));
147 49 : LBASSERT( nodes->empty( ));
148 :
149 49 : delete objectStore;
150 49 : objectStore = 0;
151 49 : LBASSERT( !commandThread->isRunning( ));
152 49 : delete commandThread;
153 49 : commandThread = 0;
154 :
155 49 : LBASSERT( !receiverThread->isRunning( ));
156 49 : delete receiverThread;
157 49 : receiverThread = 0;
158 49 : }
159 :
160 271 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
161 :
162 : /** Commands re-scheduled for dispatch. */
163 : CommandList pendingCommands;
164 :
165 : /** The command buffer 'allocator' for small packets */
166 : co::BufferCache smallBuffers;
167 :
168 : /** The command buffer 'allocator' for big packets */
169 : co::BufferCache bigBuffers;
170 :
171 : bool sendToken; //!< send token availability.
172 : uint64_t lastSendToken; //!< last used time for timeout detection
173 : std::deque< co::ICommand > sendTokenQueue; //!< pending requests
174 :
175 : /** Manager of distributed object */
176 : ObjectStore* objectStore;
177 :
178 : /** Needed for thread-safety during nodeID-based connect() */
179 : lunchbox::Lock connectLock;
180 :
181 : /** The node for each connection. */
182 : ConnectionNodeHash connectionNodes; // read and write: recv only
183 :
184 : /** The connected nodes. */
185 : lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
186 :
187 : /** The connection set of all connections from/to this node. */
188 : co::ConnectionSet incoming;
189 :
190 : /** The process-global clock. */
191 : lunchbox::Clock clock;
192 :
193 : /** The registered push handlers. */
194 : lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
195 :
196 : /** The registered custom command handlers. */
197 : lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
198 :
199 : ReceiverThread* receiverThread;
200 : CommandThread* commandThread;
201 :
202 : lunchbox::Lockable< lunchbox::Servus > service;
203 :
204 : // Performance counters:
205 : a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
206 : };
207 : }
208 :
209 51 : LocalNode::LocalNode( const uint32_t type )
210 : : Node( type )
211 51 : , _impl( new detail::LocalNode )
212 : {
213 51 : _impl->receiverThread = new detail::ReceiverThread( this );
214 51 : _impl->commandThread = new detail::CommandThread( this );
215 51 : _impl->objectStore = new ObjectStore( this, _impl->counters );
216 :
217 51 : CommandQueue* queue = getCommandThreadQueue();
218 : registerCommand( CMD_NODE_CONNECT,
219 51 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
220 : registerCommand( CMD_NODE_CONNECT_BE,
221 51 : CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
222 : registerCommand( CMD_NODE_CONNECT_REPLY,
223 51 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
224 : registerCommand( CMD_NODE_CONNECT_REPLY_BE,
225 51 : CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
226 : registerCommand( CMD_NODE_ID,
227 51 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
228 : registerCommand( CMD_NODE_ID_BE,
229 51 : CmdFunc( this, &LocalNode::_cmdID ), 0 );
230 : registerCommand( CMD_NODE_ACK_REQUEST,
231 51 : CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
232 : registerCommand( CMD_NODE_STOP_RCV,
233 51 : CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
234 : registerCommand( CMD_NODE_STOP_CMD,
235 51 : CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
236 : registerCommand( CMD_NODE_SET_AFFINITY_RCV,
237 51 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
238 : registerCommand( CMD_NODE_SET_AFFINITY_CMD,
239 51 : CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
240 : registerCommand( CMD_NODE_CONNECT_ACK,
241 51 : CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
242 : registerCommand( CMD_NODE_DISCONNECT,
243 51 : CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
244 : registerCommand( CMD_NODE_GET_NODE_DATA,
245 51 : CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
246 : registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
247 51 : CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
248 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
249 51 : CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
250 : registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
251 51 : CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
252 : registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
253 51 : CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
254 : registerCommand( CMD_NODE_ADD_LISTENER,
255 51 : CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
256 : registerCommand( CMD_NODE_REMOVE_LISTENER,
257 51 : CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
258 : registerCommand( CMD_NODE_PING,
259 51 : CmdFunc( this, &LocalNode::_cmdPing ), queue );
260 : registerCommand( CMD_NODE_PING_REPLY,
261 51 : CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
262 : registerCommand( CMD_NODE_COMMAND,
263 51 : CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
264 : registerCommand( CMD_NODE_ADD_CONNECTION,
265 51 : CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
266 51 : }
267 :
268 137 : LocalNode::~LocalNode( )
269 : {
270 49 : LBASSERT( !hasPendingRequests( ));
271 49 : delete _impl;
272 88 : }
273 :
274 1 : bool LocalNode::initLocal( const int argc, char** argv )
275 : {
276 : #ifndef NDEBUG
277 1 : LBVERB << lunchbox::disableFlush << "args: ";
278 2 : for( int i=0; i<argc; i++ )
279 1 : LBVERB << argv[i] << ", ";
280 1 : LBVERB << std::endl << lunchbox::enableFlush;
281 : #endif
282 :
283 : // We do not use getopt_long because it really does not work due to the
284 : // following aspects:
285 : // - reordering of arguments
286 : // - different behavior of GNU and BSD implementations
287 : // - incomplete man pages
288 1 : for( int i=1; i<argc; ++i )
289 : {
290 0 : if( std::string( "--eq-listen" ) == argv[i] )
291 0 : LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
292 0 : if( std::string( "--eq-listen" ) == argv[i] ||
293 0 : std::string( "--co-listen" ) == argv[i] )
294 : {
295 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
296 : {
297 0 : std::string data = argv[++i];
298 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
299 0 : desc->port = Global::getDefaultPort();
300 :
301 0 : if( desc->fromString( data ))
302 : {
303 0 : addConnectionDescription( desc );
304 0 : LBASSERTINFO( data.empty(), data );
305 : }
306 : else
307 0 : LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
308 : }
309 : else
310 : {
311 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
312 : }
313 : }
314 0 : else if ( std::string( "--co-globals" ) == argv[i] )
315 : {
316 0 : if( (i+1)<argc && argv[i+1][0] != '-' )
317 : {
318 0 : const std::string data = argv[++i];
319 0 : if( !Global::fromString( data ))
320 : {
321 0 : LBWARN << "Invalid global variables string: " << data
322 0 : << ", using default global variables." << std::endl;
323 0 : }
324 : }
325 : else
326 : {
327 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
328 : }
329 : }
330 : }
331 :
332 1 : if( !listen( ))
333 : {
334 0 : LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
335 0 : << std::endl;
336 0 : return false;
337 : }
338 1 : return true;
339 : }
340 :
341 46 : bool LocalNode::listen()
342 : {
343 46 : LBVERB << "Listener data: " << serialize() << std::endl;
344 46 : if( !isClosed() || !_connectSelf( ))
345 0 : return false;
346 :
347 46 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
348 270 : for( ConnectionDescriptionsCIter i = descriptions.begin();
349 180 : i != descriptions.end(); ++i )
350 : {
351 44 : ConnectionDescriptionPtr description = *i;
352 88 : ConnectionPtr connection = Connection::create( description );
353 :
354 44 : if( !connection || !connection->listen( ))
355 : {
356 0 : LBWARN << "Can't create listener connection: " << description
357 0 : << std::endl;
358 0 : return false;
359 : }
360 :
361 44 : _impl->connectionNodes[ connection ] = this;
362 44 : if( connection->isMulticast( ))
363 0 : _addMulticast( this, connection );
364 :
365 44 : connection->acceptNB();
366 44 : _impl->incoming.addConnection( connection );
367 :
368 88 : LBVERB << "Added node " << getNodeID() << " using " << connection
369 132 : << std::endl;
370 44 : }
371 :
372 92 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
373 138 : << std::endl;
374 :
375 46 : _setListening();
376 46 : _impl->receiverThread->start();
377 :
378 46 : LBINFO << *this << std::endl;
379 46 : return true;
380 : }
381 :
382 0 : bool LocalNode::listen( ConnectionPtr connection )
383 : {
384 0 : if( !listen( ))
385 0 : return false;
386 0 : _addConnection( connection );
387 0 : return true;
388 : }
389 :
390 45 : bool LocalNode::close()
391 : {
392 45 : if( !isListening() )
393 0 : return false;
394 :
395 45 : send( CMD_NODE_STOP_RCV );
396 :
397 45 : LBCHECK( _impl->receiverThread->join( ));
398 45 : _cleanup();
399 :
400 135 : LBINFO << _impl->incoming.getSize() << " connections open after close"
401 135 : << std::endl;
402 : #ifndef NDEBUG
403 45 : const Connections& connections = _impl->incoming.getConnections();
404 135 : for( Connections::const_iterator i = connections.begin();
405 90 : i != connections.end(); ++i )
406 : {
407 0 : LBINFO << " " << *i << std::endl;
408 : }
409 : #endif
410 :
411 45 : LBASSERTINFO( !hasPendingRequests(),
412 : *static_cast< lunchbox::RequestHandler* >( this ));
413 45 : return true;
414 : }
415 :
416 0 : void LocalNode::setAffinity( const int32_t affinity )
417 : {
418 0 : send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
419 0 : send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
420 :
421 0 : lunchbox::Thread::setAffinity( affinity );
422 0 : }
423 :
424 0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
425 : {
426 0 : LBASSERT( isListening( ));
427 :
428 0 : ConnectionPtr connection = Connection::create( desc );
429 0 : if( connection && connection->listen( ))
430 : {
431 0 : addListener( connection );
432 0 : return connection;
433 : }
434 :
435 0 : return 0;
436 : }
437 :
438 0 : void LocalNode::addListener( ConnectionPtr connection )
439 : {
440 0 : LBASSERT( isListening( ));
441 0 : LBASSERT( connection->isListening( ));
442 0 : if( !isListening() || !connection->isListening( ))
443 0 : return;
444 :
445 0 : connection->ref( this ); // unref in self handler
446 :
447 : // Update everybody's description list of me, add the listener to myself in
448 : // my handler
449 0 : Nodes nodes;
450 0 : getNodes( nodes );
451 :
452 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
453 0 : (*i)->send( CMD_NODE_ADD_LISTENER )
454 0 : << (uint64_t)(connection.get( ))
455 0 : << connection->getDescription()->toString();
456 : }
457 :
458 0 : void LocalNode::removeListeners( const Connections& connections )
459 : {
460 0 : std::vector< lunchbox::Request< void > > requests;
461 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
462 : {
463 0 : ConnectionPtr connection = *i;
464 0 : requests.push_back( _removeListener( connection ));
465 0 : }
466 0 : }
467 :
468 0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
469 : {
470 0 : LBASSERT( isListening( ));
471 0 : LBASSERTINFO( !conn->isConnected(), conn );
472 :
473 0 : conn->ref( this );
474 0 : const lunchbox::Request< void > request = registerRequest< void >();
475 0 : Nodes nodes;
476 0 : getNodes( nodes );
477 :
478 0 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
479 0 : (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
480 0 : << conn->getDescription()->toString();
481 0 : return request;
482 : }
483 :
484 145 : void LocalNode::_addConnection( ConnectionPtr connection )
485 : {
486 145 : if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
487 : {
488 33 : connection->ref(); // unref in _cmdAddConnection
489 33 : send( CMD_NODE_ADD_CONNECTION ) << connection;
490 178 : return;
491 : }
492 :
493 112 : BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
494 112 : connection->recvNB( buffer, COMMAND_MINSIZE );
495 112 : _impl->incoming.addConnection( connection );
496 : }
497 :
498 214 : void LocalNode::_removeConnection( ConnectionPtr connection )
499 : {
500 214 : LBASSERT( connection );
501 :
502 214 : _impl->incoming.removeConnection( connection );
503 214 : connection->resetRecvData();
504 214 : if( !connection->isClosed( ))
505 122 : connection->close(); // cancel pending IO's
506 214 : }
507 :
508 45 : void LocalNode::_cleanup()
509 : {
510 45 : LBVERB << "Clean up stopped node" << std::endl;
511 45 : LBASSERTINFO( isClosed(), *this );
512 :
513 45 : if( !_impl->connectionNodes.empty( ))
514 132 : LBINFO << _impl->connectionNodes.size()
515 132 : << " open connections during cleanup" << std::endl;
516 : #ifndef NDEBUG
517 267 : for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
518 178 : i != _impl->connectionNodes.end(); ++i )
519 : {
520 44 : NodePtr node = i->second;
521 44 : LBINFO << " " << i->first << " : " << node << std::endl;
522 44 : }
523 : #endif
524 :
525 45 : _impl->connectionNodes.clear();
526 :
527 45 : if( !_impl->nodes->empty( ))
528 3 : LBINFO << _impl->nodes->size() << " nodes connected during cleanup"
529 3 : << std::endl;
530 :
531 : #ifndef NDEBUG
532 46 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
533 : {
534 1 : NodePtr node = i->second;
535 1 : LBINFO << " " << node << std::endl;
536 1 : }
537 : #endif
538 :
539 45 : _impl->nodes->clear();
540 45 : }
541 :
542 110 : void LocalNode::_closeNode( NodePtr node )
543 : {
544 110 : ConnectionPtr connection = node->getConnection();
545 220 : ConnectionPtr mcConnection = node->_getMulticast();
546 :
547 110 : node->_disconnect();
548 :
549 110 : if( connection )
550 : {
551 66 : LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
552 : _impl->connectionNodes.end(), connection );
553 :
554 66 : _removeConnection( connection );
555 66 : _impl->connectionNodes.erase( connection );
556 : }
557 :
558 110 : if( mcConnection )
559 : {
560 0 : _removeConnection( mcConnection );
561 0 : _impl->connectionNodes.erase( mcConnection );
562 : }
563 :
564 110 : _impl->objectStore->removeInstanceData( node->getNodeID( ));
565 :
566 220 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
567 110 : _impl->nodes->erase( node->getNodeID( ));
568 110 : notifyDisconnect( node );
569 220 : LBINFO << node << " disconnected from " << *this << std::endl;
570 110 : }
571 :
572 46 : bool LocalNode::_connectSelf()
573 : {
574 : // setup local connection to myself
575 46 : PipeConnectionPtr connection = new PipeConnection;
576 46 : if( !connection->connect( ))
577 : {
578 0 : LBERROR << "Could not create local connection to receiver thread."
579 0 : << std::endl;
580 0 : return false;
581 : }
582 :
583 46 : Node::_connect( connection->getSibling( ));
584 46 : _setClosed(); // reset state after _connect set it to connected
585 :
586 : // add to connection set
587 46 : LBASSERT( connection->getDescription( ));
588 46 : LBASSERT( _impl->connectionNodes.find( connection ) ==
589 : _impl->connectionNodes.end( ));
590 :
591 46 : _impl->connectionNodes[ connection ] = this;
592 46 : _impl->nodes.data[ getNodeID() ] = this;
593 46 : _addConnection( connection );
594 :
595 92 : LBVERB << "Added node " << getNodeID() << " using " << connection
596 138 : << std::endl;
597 46 : return true;
598 : }
599 :
600 7 : bool LocalNode::disconnect( NodePtr node )
601 : {
602 7 : if( !node || !isListening() )
603 0 : return false;
604 :
605 7 : if( !node->isConnected( ))
606 0 : return true;
607 :
608 7 : LBASSERT( !inCommandThread( ));
609 7 : lunchbox::Request< void > request = registerRequest< void >( node.get( ));
610 7 : send( CMD_NODE_DISCONNECT ) << request;
611 :
612 7 : request.wait();
613 7 : _impl->objectStore->removeNode( node );
614 7 : return true;
615 : }
616 :
617 20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
618 : {
619 20001 : if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
620 20001 : return;
621 :
622 20001 : if( node == this ) // OPT
623 0 : serveRequest( requestID );
624 : else
625 20001 : node->send( CMD_NODE_ACK_REQUEST ) << requestID;
626 : }
627 :
628 0 : void LocalNode::ping( NodePtr peer )
629 : {
630 0 : LBASSERT( !_impl->inReceiverThread( ));
631 0 : peer->send( CMD_NODE_PING );
632 0 : }
633 :
634 0 : bool LocalNode::pingIdleNodes()
635 : {
636 0 : LBASSERT( !_impl->inReceiverThread( ) );
637 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
638 0 : Nodes nodes;
639 0 : getNodes( nodes, false );
640 :
641 0 : bool pinged = false;
642 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
643 : {
644 0 : NodePtr node = *i;
645 0 : if( getTime64() - node->getLastReceiveTime() > timeout )
646 : {
647 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
648 0 : << node->getLastReceiveTime() << std::endl;
649 0 : node->send( CMD_NODE_PING );
650 0 : pinged = true;
651 : }
652 0 : }
653 0 : return pinged;
654 : }
655 :
656 : //----------------------------------------------------------------------
657 : // Object functionality
658 : //----------------------------------------------------------------------
659 0 : void LocalNode::disableInstanceCache()
660 : {
661 0 : _impl->objectStore->disableInstanceCache();
662 0 : }
663 :
664 0 : void LocalNode::expireInstanceData( const int64_t age )
665 : {
666 0 : _impl->objectStore->expireInstanceData( age );
667 0 : }
668 :
669 0 : void LocalNode::enableSendOnRegister()
670 : {
671 0 : _impl->objectStore->enableSendOnRegister();
672 0 : }
673 :
674 0 : void LocalNode::disableSendOnRegister()
675 : {
676 0 : _impl->objectStore->disableSendOnRegister();
677 0 : }
678 :
679 20 : bool LocalNode::registerObject( Object* object )
680 : {
681 20 : return _impl->objectStore->register_( object );
682 : }
683 :
684 20 : void LocalNode::deregisterObject( Object* object )
685 : {
686 20 : _impl->objectStore->deregister( object );
687 20 : }
688 :
689 39 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
690 : NodePtr master, const uint128_t& version )
691 : {
692 : const uint32_t request = _impl->objectStore->mapNB( object, id, version,
693 39 : master );
694 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
695 39 : _impl->objectStore, request );
696 39 : return f_bool_t( new FuturebImpl( func ));
697 : }
698 :
699 0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
700 : const uint128_t& version )
701 : {
702 0 : return _impl->objectStore->mapNB( object, id, version, 0 );
703 : }
704 :
705 2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
706 : const uint128_t& version, NodePtr master )
707 : {
708 2 : return _impl->objectStore->mapNB( object, id, version, master );
709 : }
710 :
711 :
712 2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
713 : {
714 2 : return _impl->objectStore->mapSync( requestID );
715 : }
716 :
717 4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
718 : const uint32_t instanceID )
719 : {
720 4 : return _impl->objectStore->sync( object, master, id, instanceID );
721 : }
722 :
723 41 : void LocalNode::unmapObject( Object* object )
724 : {
725 41 : _impl->objectStore->unmap( object );
726 41 : }
727 :
728 0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
729 : {
730 0 : _impl->objectStore->swap( oldObject, newObject );
731 0 : }
732 :
733 0 : void LocalNode::objectPush( const uint128_t& groupID,
734 : const uint128_t& objectType,
735 : const uint128_t& objectID, DataIStream& istream )
736 : {
737 0 : lunchbox::ScopedRead mutex( _impl->pushHandlers );
738 0 : HandlerHashCIter i = _impl->pushHandlers->find( groupID );
739 0 : if( i != _impl->pushHandlers->end( ))
740 0 : i->second( groupID, objectType, objectID, istream );
741 : else
742 0 : LBWARN << "No custom handler for push group " << groupID
743 0 : << " registered" << std::endl;
744 :
745 0 : if( istream.wasUsed() && istream.hasData( ))
746 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
747 0 : << objectType << " object " << objectID << std::endl;
748 0 : }
749 :
750 0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
751 : const PushHandler& handler )
752 : {
753 0 : lunchbox::ScopedWrite mutex( _impl->pushHandlers );
754 0 : (*_impl->pushHandlers)[ groupID ] = handler;
755 0 : }
756 :
757 2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
758 : const CommandHandler& func,
759 : CommandQueue* queue )
760 : {
761 2 : lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
762 2 : if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
763 : {
764 0 : LBWARN << "Already got a registered handler for custom command "
765 0 : << command << std::endl;
766 0 : return false;
767 : }
768 :
769 : _impl->commandHandlers->insert( std::make_pair( command,
770 2 : std::make_pair( func, queue )));
771 2 : return true;
772 : }
773 :
774 0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
775 : {
776 0 : LBASSERT( !inCommandThread( ));
777 0 : LBASSERT( !_impl->inReceiverThread( ));
778 :
779 0 : lunchbox::Request< void > request = registerRequest< void >();
780 0 : node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
781 :
782 : try
783 : {
784 0 : request.wait( Global::getTimeout() );
785 : }
786 : catch ( lunchbox::FutureTimeout& )
787 : {
788 : LBERROR << "Timeout while acquiring send token " << request.getID()
789 : << std::endl;
790 : request.relinquish();
791 : return 0;
792 : }
793 0 : return new co::SendToken( node );
794 : }
795 :
796 0 : void LocalNode::releaseSendToken( SendToken token )
797 : {
798 0 : LBASSERT( !_impl->inReceiverThread( ));
799 0 : if( token )
800 0 : token->release();
801 0 : }
802 :
803 : //----------------------------------------------------------------------
804 : // Connecting a node
805 : //----------------------------------------------------------------------
806 : namespace
807 : {
808 : enum ConnectResult
809 : {
810 : CONNECT_OK,
811 : CONNECT_TRY_AGAIN,
812 : CONNECT_BAD_STATE,
813 : CONNECT_TIMEOUT,
814 : CONNECT_UNREACHABLE
815 : };
816 : }
817 :
818 72 : NodePtr LocalNode::connect( const NodeID& nodeID )
819 : {
820 72 : LBASSERT( nodeID != 0 );
821 72 : LBASSERT( isListening( ));
822 :
823 : // Make sure that only one connection request based on the node identifier
824 : // is pending at a given time. Otherwise a node with the same id might be
825 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
826 : // mutex is to register connecting nodes with this local node, and handle
827 : // all cases correctly, which is far more complex. Node connections only
828 : // happen a lot during initialization, and are therefore not time-critical.
829 72 : lunchbox::ScopedWrite mutex( _impl->connectLock );
830 :
831 144 : Nodes nodes;
832 72 : getNodes( nodes );
833 :
834 75 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
835 : {
836 75 : NodePtr peer = *i;
837 75 : if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
838 72 : return peer;
839 3 : }
840 :
841 0 : LBINFO << "Connecting node " << nodeID << std::endl;
842 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
843 : {
844 0 : NodePtr peer = *i;
845 0 : NodePtr node = _connect( nodeID, peer );
846 0 : if( node )
847 0 : return node;
848 0 : }
849 :
850 0 : NodePtr node = _connectFromZeroconf( nodeID );
851 0 : if( node )
852 0 : return node;
853 :
854 : // check again if node connected by itself by now
855 0 : nodes.clear();
856 0 : getNodes( nodes );
857 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
858 : {
859 0 : node = *i;
860 0 : if( node->getNodeID() == nodeID && node->isReachable( ))
861 0 : return node;
862 : }
863 :
864 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
865 72 : return 0;
866 : }
867 :
868 0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
869 : {
870 0 : LBASSERT( nodeID != 0 );
871 :
872 0 : NodePtr node;
873 : {
874 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
875 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
876 0 : if( i != _impl->nodes->end( ))
877 0 : node = i->second;
878 : }
879 :
880 0 : LBASSERT( getNodeID() != nodeID );
881 0 : if( !node )
882 : {
883 0 : lunchbox::Request< void* > request = registerRequest< void* >();
884 0 : peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
885 0 : node = reinterpret_cast< Node* >( request.wait( ));
886 0 : if( !node )
887 : {
888 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
889 0 : << std::endl;
890 0 : return 0;
891 : }
892 0 : node->unref( this ); // ref'd before serveRequest()
893 : }
894 :
895 0 : if( node->isReachable( ))
896 0 : return node;
897 :
898 0 : size_t tries = 10;
899 0 : while( --tries )
900 : {
901 0 : switch( _connect( node ))
902 : {
903 : case CONNECT_OK:
904 0 : return node;
905 : case CONNECT_TRY_AGAIN:
906 : {
907 0 : lunchbox::RNG rng;
908 : // collision avoidance
909 0 : lunchbox::sleep( rng.get< uint8_t >( ));
910 0 : break;
911 : }
912 : case CONNECT_BAD_STATE:
913 0 : LBWARN << "Internal connect error" << std::endl;
914 : // no break;
915 : case CONNECT_TIMEOUT:
916 0 : return 0;
917 :
918 : case CONNECT_UNREACHABLE:
919 0 : break; // maybe peer talks to us
920 : }
921 :
922 0 : lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
923 : // connect failed - check for simultaneous connect from peer
924 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
925 0 : if( i != _impl->nodes->end( ))
926 0 : node = i->second;
927 0 : }
928 :
929 0 : return node->isReachable() ? node : 0;
930 : }
931 :
932 0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
933 : {
934 0 : lunchbox::ScopedWrite mutex( _impl->service );
935 :
936 : const Strings& instances =
937 0 : _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
938 0 : for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
939 : {
940 0 : const std::string& instance = *i;
941 0 : const NodeID candidate( instance );
942 0 : if( candidate != nodeID )
943 0 : continue;
944 :
945 0 : const std::string& typeStr = _impl->service->get( instance, "co_type" );
946 0 : if( typeStr.empty( ))
947 0 : return 0;
948 :
949 0 : std::istringstream in( typeStr );
950 0 : uint32_t type = 0;
951 0 : in >> type;
952 :
953 0 : NodePtr node = createNode( type );
954 0 : if( !node )
955 : {
956 0 : LBINFO << "Can't create node of type " << type << std::endl;
957 0 : continue;
958 : }
959 :
960 : const std::string& numStr = _impl->service->get( instance,
961 0 : "co_numPorts" );
962 0 : uint32_t num = 0;
963 :
964 0 : in.clear();
965 0 : in.str( numStr );
966 0 : in >> num;
967 0 : LBASSERT( num > 0 );
968 0 : for( size_t j = 0; j < num; ++j )
969 : {
970 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
971 0 : std::ostringstream out;
972 0 : out << "co_port" << j;
973 :
974 0 : std::string descStr = _impl->service->get( instance, out.str( ));
975 0 : LBASSERT( !descStr.empty( ));
976 0 : LBCHECK( desc->fromString( descStr ));
977 0 : LBASSERT( descStr.empty( ));
978 0 : node->addConnectionDescription( desc );
979 0 : }
980 0 : mutex.leave();
981 0 : if( _connect( node ))
982 0 : return node;
983 0 : }
984 0 : return 0;
985 : }
986 :
987 33 : bool LocalNode::connect( NodePtr node )
988 : {
989 33 : lunchbox::ScopedWrite mutex( _impl->connectLock );
990 33 : return ( _connect( node ) == CONNECT_OK );
991 : }
992 :
993 33 : uint32_t LocalNode::_connect( NodePtr node )
994 : {
995 33 : LBASSERTINFO( isListening(), *this );
996 33 : if( node->isReachable( ))
997 0 : return CONNECT_OK;
998 :
999 33 : LBASSERT( node->isClosed( ));
1000 33 : LBINFO << "Connecting " << node << std::endl;
1001 :
1002 : // try connecting using the given descriptions
1003 33 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
1004 99 : for( ConnectionDescriptionsCIter i = cds.begin();
1005 66 : i != cds.end(); ++i )
1006 : {
1007 33 : ConnectionDescriptionPtr description = *i;
1008 33 : if( description->type >= CONNECTIONTYPE_MULTICAST )
1009 0 : continue; // Don't use multicast for primary connections
1010 :
1011 33 : ConnectionPtr connection = Connection::create( description );
1012 33 : if( !connection || !connection->connect( ))
1013 0 : continue;
1014 :
1015 33 : return _connect( node, connection );
1016 : }
1017 :
1018 0 : LBWARN << "Node unreachable, all connections failed to connect" <<std::endl;
1019 0 : return CONNECT_UNREACHABLE;
1020 : }
1021 :
1022 0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
1023 : {
1024 0 : return ( _connect( node, connection ) == CONNECT_OK );
1025 : }
1026 :
1027 33 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
1028 : {
1029 33 : LBASSERT( connection );
1030 33 : LBASSERT( node->getNodeID() != getNodeID( ));
1031 :
1032 66 : if( !node || !isListening() || !connection->isConnected() ||
1033 33 : !node->isClosed( ))
1034 : {
1035 0 : return CONNECT_BAD_STATE;
1036 : }
1037 :
1038 33 : _addConnection( connection );
1039 :
1040 : // send connect command to peer
1041 33 : lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
1042 : #ifdef COLLAGE_BIGENDIAN
1043 : uint32_t cmd = CMD_NODE_CONNECT_BE;
1044 : lunchbox::byteswap( cmd );
1045 : #else
1046 33 : const uint32_t cmd = CMD_NODE_CONNECT;
1047 : #endif
1048 : OCommand( Connections( 1, connection ), cmd )
1049 33 : << getNodeID() << request << getType() << serialize();
1050 :
1051 33 : bool connected = false;
1052 : try
1053 : {
1054 33 : connected = request.wait( 10000 /*ms*/ );
1055 : }
1056 : catch( lunchbox::FutureTimeout& )
1057 : {
1058 : LBWARN << "Node connection handshake timeout - " << node
1059 : << " not a Collage node?" << std::endl;
1060 : request.relinquish();
1061 : return CONNECT_TIMEOUT;
1062 : }
1063 :
1064 : // In simultaneous connect case, depending on the connection type
1065 : // (e.g. RDMA), a check on the connection state of the node is required
1066 33 : if( !connected || !node->isConnected( ))
1067 0 : return CONNECT_TRY_AGAIN;
1068 :
1069 33 : LBASSERT( node->getNodeID() != 0 );
1070 33 : LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
1071 33 : LBINFO << node << " connected to " << *(Node*)this << std::endl;
1072 33 : return CONNECT_OK;
1073 : }
1074 :
1075 39 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
1076 : {
1077 39 : LBASSERTINFO( id.isUUID(), id );
1078 39 : if( !id.isUUID( ))
1079 : {
1080 0 : LBWARN << "Invalid object id " << id << std::endl;
1081 0 : return 0;
1082 : }
1083 :
1084 39 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
1085 39 : if( masterNodeID == 0 )
1086 : {
1087 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1088 0 : return 0;
1089 : }
1090 :
1091 39 : NodePtr master = connect( masterNodeID );
1092 39 : if( master && !master->isClosed( ))
1093 39 : return master;
1094 :
1095 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1096 0 : << " for object " << id << std::endl;
1097 0 : return 0;
1098 : }
1099 :
1100 33 : NodePtr LocalNode::createNode( const uint32_t type )
1101 : {
1102 33 : LBASSERTINFO( type == NODETYPE_NODE, type );
1103 33 : return new Node( type );
1104 : }
1105 :
1106 0 : NodePtr LocalNode::getNode( const NodeID& id ) const
1107 : {
1108 0 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1109 0 : NodeHash::const_iterator i = _impl->nodes->find( id );
1110 0 : if( i == _impl->nodes->end() || !i->second->isReachable( ))
1111 0 : return 0;
1112 0 : return i->second;
1113 : }
1114 :
1115 111 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
1116 : {
1117 111 : lunchbox::ScopedFastRead mutex( _impl->nodes );
1118 326 : for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1119 : {
1120 215 : NodePtr node = i->second;
1121 215 : if( node->isReachable() && ( addSelf || node != this ))
1122 215 : nodes.push_back( i->second );
1123 326 : }
1124 111 : }
1125 :
1126 138 : CommandQueue* LocalNode::getCommandThreadQueue()
1127 : {
1128 138 : return _impl->commandThread->getWorkerQueue();
1129 : }
1130 :
1131 7 : bool LocalNode::inCommandThread() const
1132 : {
1133 7 : return _impl->commandThread->isCurrent();
1134 : }
1135 :
1136 72410 : int64_t LocalNode::getTime64() const
1137 : {
1138 72410 : return _impl->clock.getTime64();
1139 : }
1140 :
1141 0 : ssize_t LocalNode::getCounter( const Counter counter ) const
1142 : {
1143 0 : return _impl->counters[ counter ];
1144 : }
1145 :
1146 193 : void LocalNode::flushCommands()
1147 : {
1148 193 : _impl->incoming.interrupt();
1149 193 : }
1150 :
1151 : //----------------------------------------------------------------------
1152 : // receiver thread functions
1153 : //----------------------------------------------------------------------
1154 46 : void LocalNode::_runReceiverThread()
1155 : {
1156 46 : LB_TS_THREAD( _rcvThread );
1157 46 : _initService();
1158 :
1159 46 : int nErrors = 0;
1160 72611 : while( isListening( ))
1161 : {
1162 72520 : const ConnectionSet::Event result = _impl->incoming.select();
1163 72519 : switch( result )
1164 : {
1165 : case ConnectionSet::EVENT_CONNECT:
1166 33 : _handleConnect();
1167 33 : break;
1168 :
1169 : case ConnectionSet::EVENT_DATA:
1170 72157 : _handleData();
1171 72157 : break;
1172 :
1173 : case ConnectionSet::EVENT_DISCONNECT:
1174 : case ConnectionSet::EVENT_INVALID_HANDLE:
1175 33 : _handleDisconnect();
1176 33 : break;
1177 :
1178 : case ConnectionSet::EVENT_TIMEOUT:
1179 0 : LBINFO << "select timeout" << std::endl;
1180 0 : break;
1181 :
1182 : case ConnectionSet::EVENT_ERROR:
1183 0 : ++nErrors;
1184 0 : LBWARN << "Connection error during select" << std::endl;
1185 0 : if( nErrors > 100 )
1186 : {
1187 0 : LBWARN << "Too many errors in a row, capping connection"
1188 0 : << std::endl;
1189 0 : _handleDisconnect();
1190 : }
1191 0 : break;
1192 :
1193 : case ConnectionSet::EVENT_SELECT_ERROR:
1194 0 : LBWARN << "Error during select" << std::endl;
1195 0 : ++nErrors;
1196 0 : if( nErrors > 10 )
1197 : {
1198 0 : LBWARN << "Too many errors in a row" << std::endl;
1199 0 : LBUNIMPLEMENTED;
1200 : }
1201 0 : break;
1202 :
1203 : case ConnectionSet::EVENT_INTERRUPT:
1204 296 : _redispatchCommands();
1205 296 : break;
1206 :
1207 : default:
1208 0 : LBUNIMPLEMENTED;
1209 : }
1210 72519 : if( result != ConnectionSet::EVENT_ERROR &&
1211 : result != ConnectionSet::EVENT_SELECT_ERROR )
1212 : {
1213 72519 : nErrors = 0;
1214 : }
1215 : }
1216 :
1217 45 : if( !_impl->pendingCommands.empty( ))
1218 0 : LBWARN << _impl->pendingCommands.size()
1219 0 : << " commands pending while leaving command thread" << std::endl;
1220 :
1221 45 : _impl->pendingCommands.clear();
1222 45 : LBCHECK( _impl->commandThread->join( ));
1223 :
1224 45 : ConnectionPtr connection = getConnection();
1225 90 : PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
1226 45 : connection = pipe->getSibling();
1227 45 : _removeConnection( connection );
1228 45 : _impl->connectionNodes.erase( connection );
1229 45 : _disconnect();
1230 :
1231 45 : const Connections& connections = _impl->incoming.getConnections();
1232 160 : while( !connections.empty( ))
1233 : {
1234 70 : connection = connections.back();
1235 70 : NodePtr node = _impl->connectionNodes[ connection ];
1236 :
1237 70 : if( node )
1238 70 : _closeNode( node );
1239 70 : _removeConnection( connection );
1240 70 : }
1241 :
1242 45 : _impl->objectStore->clear();
1243 45 : _impl->pendingCommands.clear();
1244 45 : _impl->smallBuffers.flush();
1245 45 : _impl->bigBuffers.flush();
1246 :
1247 225 : LBINFO << "Leaving receiver thread of " << lunchbox::className( this )
1248 225 : << std::endl;
1249 45 : }
1250 :
1251 33 : void LocalNode::_handleConnect()
1252 : {
1253 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1254 66 : ConnectionPtr newConn = connection->acceptSync();
1255 33 : connection->acceptNB();
1256 :
1257 33 : if( newConn )
1258 33 : _addConnection( newConn );
1259 : else
1260 33 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1261 33 : }
1262 :
1263 33 : void LocalNode::_handleDisconnect()
1264 : {
1265 33 : while( _handleData( )) ; // read remaining data off connection
1266 :
1267 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1268 33 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
1269 :
1270 33 : if( i != _impl->connectionNodes.end( ))
1271 : {
1272 33 : NodePtr node = i->second;
1273 :
1274 33 : node->ref(); // extend lifetime to give cmd handler a chance
1275 :
1276 : // local command dispatching
1277 : OCommand( this, this, CMD_NODE_REMOVE_NODE )
1278 33 : << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
1279 :
1280 33 : if( node->getConnection() == connection )
1281 33 : _closeNode( node );
1282 0 : else if( connection->isMulticast( ))
1283 0 : node->_removeMulticast( connection );
1284 : }
1285 :
1286 33 : _removeConnection( connection );
1287 33 : }
1288 :
1289 72190 : bool LocalNode::_handleData()
1290 : {
1291 72190 : _impl->smallBuffers.compact();
1292 72190 : _impl->bigBuffers.compact();
1293 :
1294 72190 : ConnectionPtr connection = _impl->incoming.getConnection();
1295 72190 : LBASSERT( connection );
1296 :
1297 144380 : BufferPtr buffer = _readHead( connection );
1298 72190 : if( !buffer ) // fluke signal
1299 66 : return false;
1300 :
1301 144248 : ICommand command = _setupCommand( connection, buffer );
1302 72124 : const bool gotCommand = _readTail( command, buffer, connection );
1303 72124 : LBASSERT( gotCommand );
1304 :
1305 : // start next receive
1306 144248 : BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1307 72124 : connection->recvNB( nextBuffer, COMMAND_MINSIZE );
1308 :
1309 72124 : if( gotCommand )
1310 : {
1311 72124 : _dispatchCommand( command );
1312 72124 : return true;
1313 : }
1314 :
1315 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1316 72190 : return false;
1317 : }
1318 :
1319 72190 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
1320 : {
1321 72190 : BufferPtr buffer;
1322 72190 : const bool gotSize = connection->recvSync( buffer, false );
1323 :
1324 72190 : if( !buffer ) // fluke signal
1325 : {
1326 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1327 0 : << std::endl;
1328 0 : _impl->incoming.setDirty();
1329 0 : return 0;
1330 : }
1331 :
1332 72190 : if( gotSize )
1333 72124 : return buffer;
1334 :
1335 : // Some systems signal data on dead connections.
1336 66 : buffer->setSize( 0 );
1337 66 : connection->recvNB( buffer, COMMAND_MINSIZE );
1338 66 : return 0;
1339 : }
1340 :
1341 72124 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
1342 : ConstBufferPtr buffer )
1343 : {
1344 72124 : NodePtr node;
1345 72124 : ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
1346 72124 : if( i != _impl->connectionNodes.end( ))
1347 72058 : node = i->second;
1348 72124 : LBVERB << "Handle data from " << node << std::endl;
1349 :
1350 : #ifdef COLLAGE_BIGENDIAN
1351 : const bool swapping = node ? !node->isBigEndian() : false;
1352 : #else
1353 72124 : const bool swapping = node ? node->isBigEndian() : false;
1354 : #endif
1355 144248 : ICommand command( this, node, buffer, swapping );
1356 :
1357 72124 : if( node )
1358 : {
1359 72058 : node->_setLastReceive( getTime64( ));
1360 72058 : return command;
1361 : }
1362 :
1363 66 : uint32_t cmd = command.getCommand();
1364 : #ifdef COLLAGE_BIGENDIAN
1365 : lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
1366 : #endif
1367 66 : switch( cmd )
1368 : {
1369 : case CMD_NODE_CONNECT:
1370 : case CMD_NODE_CONNECT_REPLY:
1371 : case CMD_NODE_ID:
1372 : #ifdef COLLAGE_BIGENDIAN
1373 : command = ICommand( this, node, buffer, true );
1374 : #endif
1375 66 : break;
1376 :
1377 : case CMD_NODE_CONNECT_BE:
1378 : case CMD_NODE_CONNECT_REPLY_BE:
1379 : case CMD_NODE_ID_BE:
1380 : #ifndef COLLAGE_BIGENDIAN
1381 0 : command = ICommand( this, node, buffer, true );
1382 : #endif
1383 0 : break;
1384 :
1385 : default:
1386 0 : LBUNIMPLEMENTED;
1387 0 : return ICommand();
1388 : }
1389 :
1390 66 : command.setCommand( cmd ); // reset correctly swapped version
1391 72190 : return command;
1392 : }
1393 :
1394 72124 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
1395 : ConnectionPtr connection )
1396 : {
1397 72124 : const uint64_t needed = command.getSize();
1398 72124 : if( needed <= buffer->getSize( ))
1399 62112 : return true;
1400 :
1401 10011 : if( needed > buffer->getMaxSize( ))
1402 : {
1403 0 : LBASSERT( needed > COMMAND_ALLOCSIZE );
1404 0 : LBASSERTINFO( needed < LB_BIT48,
1405 : "Out-of-sync network stream: " << command << "?" );
1406 : // not enough space for remaining data, alloc and copy to new buffer
1407 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
1408 0 : newBuffer->replace( *buffer );
1409 0 : buffer = newBuffer;
1410 :
1411 0 : command = ICommand( this, command.getRemoteNode(), buffer,
1412 0 : command.isSwapping( ));
1413 : }
1414 :
1415 : // read remaining data
1416 10011 : connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
1417 10011 : return connection->recvSync( buffer );
1418 : }
1419 :
1420 34 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
1421 : {
1422 34 : LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
1423 : BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
1424 : _impl->bigBuffers.alloc( size ) :
1425 34 : _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
1426 34 : return buffer;
1427 : }
1428 :
1429 72169 : void LocalNode::_dispatchCommand( ICommand& command )
1430 : {
1431 72169 : LBASSERTINFO( command.isValid(), command );
1432 :
1433 72169 : if( dispatchCommand( command ))
1434 72169 : _redispatchCommands();
1435 : else
1436 : {
1437 0 : _redispatchCommands();
1438 0 : _impl->pendingCommands.push_back( command );
1439 : }
1440 72169 : }
1441 :
1442 72205 : bool LocalNode::dispatchCommand( ICommand& command )
1443 : {
1444 72205 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1445 72205 : LBASSERTINFO( command.isValid(), command );
1446 :
1447 72205 : const uint32_t type = command.getType();
1448 72205 : switch( type )
1449 : {
1450 : case COMMANDTYPE_NODE:
1451 71697 : LBCHECK( Dispatcher::dispatchCommand( command ));
1452 71697 : return true;
1453 :
1454 : case COMMANDTYPE_OBJECT:
1455 508 : return _impl->objectStore->dispatchObjectCommand( command );
1456 :
1457 : default:
1458 0 : LBABORT( "Unknown command type " << type << " for " << command );
1459 0 : return true;
1460 : }
1461 : }
1462 :
1463 72465 : void LocalNode::_redispatchCommands()
1464 : {
1465 72465 : bool changes = true;
1466 144930 : while( changes && !_impl->pendingCommands.empty( ))
1467 : {
1468 0 : changes = false;
1469 :
1470 0 : for( CommandList::iterator i = _impl->pendingCommands.begin();
1471 0 : i != _impl->pendingCommands.end(); ++i )
1472 : {
1473 0 : ICommand& command = *i;
1474 0 : LBASSERT( command.isValid( ));
1475 :
1476 0 : if( dispatchCommand( command ))
1477 : {
1478 0 : _impl->pendingCommands.erase( i );
1479 0 : changes = true;
1480 0 : break;
1481 : }
1482 : }
1483 : }
1484 :
1485 : #ifndef NDEBUG
1486 72465 : if( !_impl->pendingCommands.empty( ))
1487 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1488 0 : << std::endl;
1489 72465 : LBASSERT( _impl->pendingCommands.size() < 200 );
1490 : #endif
1491 72465 : }
1492 :
1493 46 : void LocalNode::_initService()
1494 : {
1495 46 : LB_TS_SCOPED( _rcvThread );
1496 46 : _impl->service->withdraw(); // go silent during k/v update
1497 :
1498 90 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1499 46 : if( descs.empty( ))
1500 48 : return;
1501 :
1502 88 : std::ostringstream out;
1503 44 : out << getType();
1504 44 : _impl->service->set( "co_type", out.str( ));
1505 :
1506 44 : out.str("");
1507 44 : out << descs.size();
1508 44 : _impl->service->set( "co_numPorts", out.str( ));
1509 :
1510 88 : for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
1511 : {
1512 44 : ConnectionDescriptionPtr desc = *i;
1513 44 : out.str("");
1514 44 : out << "co_port" << i - descs.begin();
1515 44 : _impl->service->set( out.str(), desc->toString( ));
1516 44 : }
1517 :
1518 88 : _impl->service->announce( descs.front()->port, getNodeID().getString( ));
1519 : }
1520 :
1521 45 : void LocalNode::_exitService()
1522 : {
1523 45 : _impl->service->withdraw();
1524 45 : }
1525 :
1526 0 : Zeroconf LocalNode::getZeroconf()
1527 : {
1528 0 : lunchbox::ScopedWrite mutex( _impl->service );
1529 0 : _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
1530 0 : return Zeroconf( _impl->service.data );
1531 : }
1532 :
1533 :
1534 : //----------------------------------------------------------------------
1535 : // command thread functions
1536 : //----------------------------------------------------------------------
1537 46 : bool LocalNode::_startCommandThread( const int32_t threadID )
1538 : {
1539 46 : _impl->commandThread->threadID = threadID;
1540 46 : return _impl->commandThread->start();
1541 : }
1542 :
1543 50596 : bool LocalNode::_notifyCommandThreadIdle()
1544 : {
1545 50596 : return _impl->objectStore->notifyCommandThreadIdle();
1546 : }
1547 :
1548 20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
1549 : {
1550 20001 : const uint32_t requestID = command.get< uint32_t >();
1551 20001 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
1552 :
1553 20001 : serveRequest( requestID );
1554 20001 : return true;
1555 : }
1556 :
1557 45 : bool LocalNode::_cmdStopRcv( ICommand& command )
1558 : {
1559 45 : LB_TS_THREAD( _rcvThread );
1560 45 : LBASSERT( isListening( ));
1561 :
1562 45 : _exitService();
1563 45 : _setClosing(); // causes rcv thread exit
1564 :
1565 45 : command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
1566 45 : _dispatchCommand( command );
1567 45 : return true;
1568 : }
1569 :
1570 45 : bool LocalNode::_cmdStopCmd( ICommand& )
1571 : {
1572 45 : LB_TS_THREAD( _cmdThread );
1573 45 : LBASSERTINFO( isClosing(), *this );
1574 :
1575 45 : _setClosed();
1576 45 : return true;
1577 : }
1578 :
1579 0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
1580 : {
1581 0 : const int32_t affinity = command.get< int32_t >();
1582 :
1583 0 : lunchbox::Thread::setAffinity( affinity );
1584 0 : return true;
1585 : }
1586 :
1587 33 : bool LocalNode::_cmdConnect( ICommand& command )
1588 : {
1589 33 : LBASSERTINFO( !command.getRemoteNode(), command );
1590 33 : LBASSERT( _impl->inReceiverThread( ));
1591 :
1592 33 : const NodeID& nodeID = command.get< NodeID >();
1593 33 : const uint32_t requestID = command.get< uint32_t >();
1594 33 : const uint32_t nodeType = command.get< uint32_t >();
1595 33 : std::string data = command.get< std::string >();
1596 :
1597 33 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1598 33 : << nodeType << " data " << data << std::endl;
1599 :
1600 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1601 :
1602 33 : LBASSERT( connection );
1603 33 : LBASSERT( nodeID != getNodeID() );
1604 33 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1605 : _impl->connectionNodes.end( ));
1606 :
1607 66 : NodePtr peer;
1608 : #ifdef COLLAGE_BIGENDIAN
1609 : uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
1610 : lunchbox::byteswap( cmd );
1611 : #else
1612 33 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1613 : #endif
1614 :
1615 : // No locking needed, only recv thread modifies
1616 33 : NodeHashCIter i = _impl->nodes->find( nodeID );
1617 33 : if( i != _impl->nodes->end( ))
1618 : {
1619 0 : peer = i->second;
1620 0 : if( peer->isReachable( ))
1621 : {
1622 : // Node exists, probably simultaneous connect from peer
1623 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1624 0 : << std::endl;
1625 :
1626 : // refuse connection
1627 : OCommand( Connections( 1, connection ), cmd )
1628 0 : << NodeID() << requestID;
1629 :
1630 : // NOTE: There is no close() here. The reply command above has to be
1631 : // received by the peer first, before closing the connection.
1632 0 : _removeConnection( connection );
1633 0 : return true;
1634 : }
1635 : }
1636 :
1637 : // create and add connected node
1638 33 : if( !peer )
1639 33 : peer = createNode( nodeType );
1640 33 : if( !peer )
1641 : {
1642 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1643 0 : << std::endl;
1644 :
1645 : // refuse connection
1646 0 : OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
1647 :
1648 : // NOTE: There is no close() here. The reply command above has to be
1649 : // received by the peer first, before closing the connection.
1650 0 : _removeConnection( connection );
1651 0 : return true;
1652 : }
1653 :
1654 33 : if( !peer->deserialize( data ))
1655 0 : LBWARN << "Error during node initialization" << std::endl;
1656 33 : LBASSERTINFO( data.empty(), data );
1657 33 : LBASSERTINFO( peer->getNodeID() == nodeID,
1658 : peer->getNodeID() << "!=" << nodeID );
1659 33 : LBASSERT( peer->getType() == nodeType );
1660 :
1661 33 : _impl->connectionNodes[ connection ] = peer;
1662 : {
1663 33 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1664 33 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1665 : }
1666 33 : LBVERB << "Added node " << nodeID << std::endl;
1667 :
1668 : // send our information as reply
1669 : OCommand( Connections( 1, connection ), cmd )
1670 33 : << getNodeID() << requestID << getType() << serialize();
1671 :
1672 66 : return true;
1673 : }
1674 :
1675 33 : bool LocalNode::_cmdConnectReply( ICommand& command )
1676 : {
1677 33 : LBASSERT( !command.getRemoteNode( ));
1678 33 : LBASSERT( _impl->inReceiverThread( ));
1679 :
1680 33 : ConnectionPtr connection = _impl->incoming.getConnection();
1681 33 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1682 : _impl->connectionNodes.end( ));
1683 :
1684 33 : const NodeID& nodeID = command.get< NodeID >();
1685 33 : const uint32_t requestID = command.get< uint32_t >();
1686 :
1687 : // connection refused
1688 33 : if( nodeID == 0 )
1689 : {
1690 0 : LBINFO << "Connection refused, node already connected by peer"
1691 0 : << std::endl;
1692 :
1693 0 : _removeConnection( connection );
1694 0 : serveRequest( requestID, false );
1695 0 : return true;
1696 : }
1697 :
1698 33 : const uint32_t nodeType = command.get< uint32_t >();
1699 66 : std::string data = command.get< std::string >();
1700 :
1701 33 : LBVERB << "handle connect reply " << command << " req " << requestID
1702 33 : << " type " << nodeType << " data " << data << std::endl;
1703 :
1704 : // No locking needed, only recv thread modifies
1705 33 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1706 66 : NodePtr peer;
1707 33 : if( i != _impl->nodes->end( ))
1708 0 : peer = i->second;
1709 :
1710 33 : if( peer && peer->isReachable( )) // simultaneous connect
1711 : {
1712 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1713 0 : << connection << std::endl;
1714 :
1715 0 : _removeConnection( connection );
1716 0 : _closeNode( peer );
1717 0 : serveRequest( requestID, false );
1718 0 : return true;
1719 : }
1720 :
1721 : // create and add node
1722 33 : if( !peer )
1723 : {
1724 33 : if( requestID != LB_UNDEFINED_UINT32 )
1725 33 : peer = reinterpret_cast< Node* >( getRequestData( requestID ));
1726 : else
1727 0 : peer = createNode( nodeType );
1728 : }
1729 33 : if( !peer )
1730 : {
1731 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1732 0 : << std::endl;
1733 0 : _removeConnection( connection );
1734 0 : return true;
1735 : }
1736 :
1737 33 : LBASSERTINFO( peer->getType() == nodeType,
1738 : peer->getType() << " != " << nodeType );
1739 33 : LBASSERT( peer->isClosed( ));
1740 :
1741 33 : if( !peer->deserialize( data ))
1742 0 : LBWARN << "Error during node initialization" << std::endl;
1743 33 : LBASSERT( data.empty( ));
1744 33 : LBASSERT( peer->getNodeID() == nodeID );
1745 :
1746 : // send ACK to peer
1747 : // cppcheck-suppress unusedScopedObject
1748 33 : OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
1749 :
1750 33 : peer->_connect( connection );
1751 33 : _impl->connectionNodes[ connection ] = peer;
1752 : {
1753 33 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1754 33 : _impl->nodes.data[ peer->getNodeID() ] = peer;
1755 : }
1756 33 : _connectMulticast( peer );
1757 33 : LBVERB << "Added node " << nodeID << std::endl;
1758 :
1759 33 : serveRequest( requestID, true );
1760 :
1761 33 : notifyConnect( peer );
1762 66 : return true;
1763 : }
1764 :
1765 33 : bool LocalNode::_cmdConnectAck( ICommand& command )
1766 : {
1767 33 : NodePtr node = command.getRemoteNode();
1768 33 : LBASSERT( node );
1769 33 : LBASSERT( _impl->inReceiverThread( ));
1770 33 : LBVERB << "handle connect ack" << std::endl;
1771 :
1772 33 : node->_connect( _impl->incoming.getConnection( ));
1773 33 : _connectMulticast( node );
1774 33 : notifyConnect( node );
1775 33 : return true;
1776 : }
1777 :
1778 0 : bool LocalNode::_cmdID( ICommand& command )
1779 : {
1780 0 : LBASSERT( _impl->inReceiverThread( ));
1781 :
1782 0 : const NodeID& nodeID = command.get< NodeID >();
1783 0 : uint32_t nodeType = command.get< uint32_t >();
1784 0 : std::string data = command.get< std::string >();
1785 :
1786 0 : if( command.getRemoteNode( ))
1787 : {
1788 0 : LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
1789 0 : LBASSERT( command.getRemoteNode()->_getMulticast( ));
1790 0 : return true;
1791 : }
1792 :
1793 0 : LBINFO << "handle ID " << command << " node " << nodeID << std::endl;
1794 :
1795 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1796 0 : LBASSERT( connection->isMulticast( ));
1797 0 : LBASSERT( _impl->connectionNodes.find( connection ) ==
1798 : _impl->connectionNodes.end( ));
1799 :
1800 0 : NodePtr node;
1801 0 : if( nodeID == getNodeID() ) // 'self' multicast connection
1802 0 : node = this;
1803 : else
1804 : {
1805 : // No locking needed, only recv thread writes
1806 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1807 :
1808 0 : if( i == _impl->nodes->end( ))
1809 : {
1810 : // unknown node: create and add unconnected node
1811 0 : node = createNode( nodeType );
1812 :
1813 0 : if( !node->deserialize( data ))
1814 0 : LBWARN << "Error during node initialization" << std::endl;
1815 0 : LBASSERTINFO( data.empty(), data );
1816 :
1817 : {
1818 0 : lunchbox::ScopedFastWrite mutex( _impl->nodes );
1819 0 : _impl->nodes.data[ nodeID ] = node;
1820 : }
1821 0 : LBVERB << "Added node " << nodeID << " with multicast "
1822 0 : << connection << std::endl;
1823 : }
1824 : else
1825 0 : node = i->second;
1826 : }
1827 0 : LBASSERT( node );
1828 0 : LBASSERTINFO( node->getNodeID() == nodeID,
1829 : node->getNodeID() << "!=" << nodeID );
1830 :
1831 0 : _connectMulticast( node, connection );
1832 0 : _impl->connectionNodes[ connection ] = node;
1833 0 : LBINFO << "Added multicast connection " << connection << " from " << nodeID
1834 0 : << " to " << getNodeID() << std::endl;
1835 0 : return true;
1836 : }
1837 :
1838 7 : bool LocalNode::_cmdDisconnect( ICommand& command )
1839 : {
1840 7 : LBASSERT( _impl->inReceiverThread( ));
1841 :
1842 7 : const uint32_t requestID = command.get< uint32_t >();
1843 :
1844 7 : NodePtr node = static_cast<Node*>( getRequestData( requestID ));
1845 7 : LBASSERT( node );
1846 :
1847 7 : _closeNode( node );
1848 7 : LBASSERT( node->isClosed( ));
1849 7 : serveRequest( requestID );
1850 7 : return true;
1851 : }
1852 :
1853 0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
1854 : {
1855 0 : const NodeID& nodeID = command.get< NodeID >();
1856 0 : const uint32_t requestID = command.get< uint32_t >();
1857 :
1858 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1859 0 : << " nodeID " << nodeID << std::endl;
1860 :
1861 0 : NodePtr node = getNode( nodeID );
1862 0 : NodePtr toNode = command.getRemoteNode();
1863 :
1864 0 : uint32_t nodeType = NODETYPE_INVALID;
1865 0 : std::string nodeData;
1866 0 : if( node )
1867 : {
1868 0 : nodeType = node->getType();
1869 0 : nodeData = node->serialize();
1870 0 : LBINFO << "Sent node data '" << nodeData << "' for " << nodeID << " to "
1871 0 : << toNode << std::endl;
1872 : }
1873 :
1874 : toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
1875 0 : << nodeID << requestID << nodeType << nodeData;
1876 0 : return true;
1877 : }
1878 :
1879 0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
1880 : {
1881 0 : LBASSERT( _impl->inReceiverThread( ));
1882 :
1883 0 : const NodeID& nodeID = command.get< NodeID >();
1884 0 : const uint32_t requestID = command.get< uint32_t >();
1885 0 : const uint32_t nodeType = command.get< uint32_t >();
1886 0 : std::string nodeData = command.get< std::string >();
1887 :
1888 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
1889 0 : << " type " << nodeType << " data " << nodeData << std::endl;
1890 :
1891 : // No locking needed, only recv thread writes
1892 0 : NodeHash::const_iterator i = _impl->nodes->find( nodeID );
1893 0 : if( i != _impl->nodes->end( ))
1894 : {
1895 : // Requested node connected to us in the meantime
1896 0 : NodePtr node = i->second;
1897 :
1898 0 : node->ref( this );
1899 0 : serveRequest( requestID, node.get( ));
1900 0 : return true;
1901 : }
1902 :
1903 0 : if( nodeType == NODETYPE_INVALID )
1904 : {
1905 0 : serveRequest( requestID, (void*)0 );
1906 0 : return true;
1907 : }
1908 :
1909 : // new node: create and add unconnected node
1910 0 : NodePtr node = createNode( nodeType );
1911 0 : if( node )
1912 : {
1913 0 : LBASSERT( node );
1914 :
1915 0 : if( !node->deserialize( nodeData ))
1916 0 : LBWARN << "Failed to initialize node data" << std::endl;
1917 0 : LBASSERT( nodeData.empty( ));
1918 0 : node->ref( this );
1919 : }
1920 : else
1921 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
1922 :
1923 0 : serveRequest( requestID, node.get( ));
1924 0 : return true;
1925 : }
1926 :
1927 0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
1928 : {
1929 0 : LBASSERT( inCommandThread( ));
1930 0 : if( !_impl->sendToken ) // enqueue command if no token available
1931 : {
1932 0 : const uint32_t timeout = Global::getTimeout();
1933 0 : if( timeout == LB_TIMEOUT_INDEFINITE ||
1934 0 : ( getTime64() - _impl->lastSendToken <= timeout ))
1935 : {
1936 0 : _impl->sendTokenQueue.push_back( command );
1937 0 : return true;
1938 : }
1939 :
1940 : // timeout! - clear old requests
1941 0 : _impl->sendTokenQueue.clear();
1942 : // 'generate' new token - release is robust
1943 : }
1944 :
1945 0 : _impl->sendToken = false;
1946 :
1947 0 : const uint32_t requestID = command.get< uint32_t >();
1948 0 : command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1949 0 : << requestID;
1950 0 : return true;
1951 : }
1952 :
1953 0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
1954 : {
1955 0 : const uint32_t requestID = command.get< uint32_t >();
1956 0 : serveRequest( requestID );
1957 0 : return true;
1958 : }
1959 :
1960 0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
1961 : {
1962 0 : LBASSERT( inCommandThread( ));
1963 0 : _impl->lastSendToken = getTime64();
1964 :
1965 0 : if( _impl->sendToken )
1966 0 : return true; // double release due to timeout
1967 0 : if( _impl->sendTokenQueue.empty( ))
1968 : {
1969 0 : _impl->sendToken = true;
1970 0 : return true;
1971 : }
1972 :
1973 0 : ICommand& request = _impl->sendTokenQueue.front();
1974 :
1975 0 : const uint32_t requestID = request.get< uint32_t >();
1976 0 : request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
1977 0 : << requestID;
1978 0 : _impl->sendTokenQueue.pop_front();
1979 0 : return true;
1980 : }
1981 :
1982 0 : bool LocalNode::_cmdAddListener( ICommand& command )
1983 : {
1984 : Connection* rawConnection =
1985 0 : reinterpret_cast< Connection* >( command.get< uint64_t >( ));
1986 0 : std::string data = command.get< std::string >();
1987 :
1988 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
1989 0 : command.getRemoteNode()->_addConnectionDescription( description );
1990 :
1991 0 : if( command.getRemoteNode() != this )
1992 0 : return true;
1993 :
1994 0 : ConnectionPtr connection = rawConnection;
1995 0 : connection->unref();
1996 0 : LBASSERT( connection );
1997 :
1998 0 : _impl->connectionNodes[ connection ] = this;
1999 0 : if( connection->isMulticast( ))
2000 0 : _addMulticast( this, connection );
2001 :
2002 0 : connection->acceptNB();
2003 0 : _impl->incoming.addConnection( connection );
2004 :
2005 0 : _initService(); // update zeroconf
2006 0 : return true;
2007 : }
2008 :
2009 0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
2010 : {
2011 0 : const uint32_t requestID = command.get< uint32_t >();
2012 0 : Connection* rawConnection = command.get< Connection* >();
2013 0 : std::string data = command.get< std::string >();
2014 :
2015 0 : ConnectionDescriptionPtr description = new ConnectionDescription( data );
2016 0 : LBCHECK(
2017 : command.getRemoteNode()->_removeConnectionDescription( description ));
2018 :
2019 0 : if( command.getRemoteNode() != this )
2020 0 : return true;
2021 :
2022 0 : _initService(); // update zeroconf
2023 :
2024 0 : ConnectionPtr connection = rawConnection;
2025 0 : connection->unref( this );
2026 0 : LBASSERT( connection );
2027 :
2028 0 : if( connection->isMulticast( ))
2029 0 : _removeMulticast( connection );
2030 :
2031 0 : _impl->incoming.removeConnection( connection );
2032 0 : LBASSERT( _impl->connectionNodes.find( connection ) !=
2033 : _impl->connectionNodes.end( ));
2034 0 : _impl->connectionNodes.erase( connection );
2035 0 : serveRequest( requestID );
2036 0 : return true;
2037 : }
2038 :
2039 0 : bool LocalNode::_cmdPing( ICommand& command )
2040 : {
2041 0 : LBASSERT( inCommandThread( ));
2042 0 : command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
2043 0 : return true;
2044 : }
2045 :
2046 2 : bool LocalNode::_cmdCommand( ICommand& command )
2047 : {
2048 2 : const uint128_t& commandID = command.get< uint128_t >();
2049 2 : CommandHandler func;
2050 : {
2051 2 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2052 2 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2053 2 : if( i == _impl->commandHandlers->end( ))
2054 0 : return false;
2055 :
2056 2 : CommandQueue* queue = i->second.second;
2057 2 : if( queue )
2058 : {
2059 : command.setDispatchFunction( CmdFunc( this,
2060 1 : &LocalNode::_cmdCommandAsync ));
2061 1 : queue->push( command );
2062 1 : return true;
2063 : }
2064 : // else
2065 :
2066 1 : func = i->second.first;
2067 : }
2068 :
2069 2 : CustomICommand customCmd( command );
2070 3 : return func( customCmd );
2071 : }
2072 :
2073 1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
2074 : {
2075 1 : const uint128_t& commandID = command.get< uint128_t >();
2076 1 : CommandHandler func;
2077 : {
2078 1 : lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
2079 1 : CommandHashCIter i = _impl->commandHandlers->find( commandID );
2080 1 : LBASSERT( i != _impl->commandHandlers->end( ));
2081 1 : if( i == _impl->commandHandlers->end( ))
2082 0 : return true; // deregistered between dispatch and now
2083 1 : func = i->second.first;
2084 : }
2085 2 : CustomICommand customCmd( command );
2086 2 : return func( customCmd );
2087 : }
2088 :
2089 33 : bool LocalNode::_cmdAddConnection( ICommand& command )
2090 : {
2091 33 : LBASSERT( _impl->inReceiverThread( ));
2092 :
2093 33 : ConnectionPtr connection = command.get< ConnectionPtr >();
2094 33 : _addConnection( connection );
2095 33 : connection->unref(); // ref'd by _addConnection
2096 33 : return true;
2097 : }
2098 :
2099 60 : }
|