Equalizer
1.2.1
|
00001 00002 /* Copyright (c) 2005-2012, Stefan Eilemann <eile@equalizergraphics.com> 00003 * 2010, Cedric Stalder <cedric.stalder@gmail.com> 00004 * 00005 * This library is free software; you can redistribute it and/or modify it under 00006 * the terms of the GNU Lesser General Public License version 2.1 as published 00007 * by the Free Software Foundation. 00008 * 00009 * This library is distributed in the hope that it will be useful, but WITHOUT 00010 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00011 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00012 * details. 00013 * 00014 * You should have received a copy of the GNU Lesser General Public License 00015 * along with this library; if not, write to the Free Software Foundation, Inc., 00016 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 00017 */ 00018 00019 #ifndef CO_LOCALNODE_H 00020 #define CO_LOCALNODE_H 00021 00022 #include <co/node.h> // base class 00023 #include <co/base/requestHandler.h> // base class 00024 00025 #include <co/commandCache.h> // member 00026 #include <co/commandQueue.h> // member 00027 #include <co/connectionSet.h> // member 00028 #include <co/objectVersion.h> // used in inline method 00029 #include <co/worker.h> // member 00030 00031 #include <co/base/clock.h> // member 00032 #include <co/base/hash.h> // member 00033 #include <co/base/lockable.h> // member 00034 #include <co/base/spinLock.h> // member 00035 #include <co/base/types.h> // member 00036 00037 namespace co 00038 { 00039 class ObjectStore; 00040 00047 class LocalNode : public base::RequestHandler, public Node 00048 { 00049 public: 00050 CO_API LocalNode( ); 00051 CO_API virtual ~LocalNode( ); 00052 00053 typedef NodePtr SendToken; 00054 00085 CO_API virtual bool initLocal( const int argc, char** argv ); 00086 00099 CO_API virtual bool listen(); 00100 CO_API virtual bool listen( ConnectionPtr connection ); 00101 00111 CO_API virtual bool close(); 00112 00114 virtual bool exitLocal() { return close(); } 00115 00131 CO_API bool connect( NodePtr node ); 00132 00144 CO_API NodePtr connect( const NodeID& nodeID ); 00145 00153 CO_API virtual bool disconnect( NodePtr node ); 00155 00159 CO_API void disableInstanceCache(); 00160 00162 CO_API void expireInstanceData( const int64_t age ); 00163 00179 CO_API void enableSendOnRegister(); 00180 00182 CO_API void disableSendOnRegister(); 00183 00195 CO_API bool registerObject( Object* object ); 00196 00202 CO_API virtual void deregisterObject( Object* object ); 00203 00237 CO_API bool mapObject( Object* object, const base::UUID& id, 00238 const uint128_t& version = VERSION_OLDEST ); 00239 00241 bool mapObject( Object* object, const ObjectVersion& v ) 00242 { return mapObject( object, v.identifier, v.version ); } 00243 00245 CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 00246 const uint128_t& version = VERSION_OLDEST ); 00247 00252 CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 00253 const uint128_t& version, NodePtr master ); 00254 00256 CO_API bool mapObjectSync( const uint32_t requestID ); 00257 00263 CO_API void unmapObject( Object* object ); 00264 00266 CO_API void releaseObject( Object* object ); 00267 00286 CO_API virtual void objectPush( const uint128_t& groupID, 00287 const uint128_t& typeID, 00288 const uint128_t& objectID, 00289 DataIStream& istream ); 00290 00293 CO_API void swapObject( Object* oldObject, Object* newObject ); 00295 00306 CO_API NodePtr getNode( const NodeID& id ) const; 00307 00309 void getNodes( Nodes& nodes, const bool addSelf = true ) const; 00310 00315 CO_API SendToken acquireSendToken( NodePtr toNode ); 00316 CO_API void releaseSendToken( SendToken& token ); 00317 00319 CommandQueue* getCommandThreadQueue() 00320 { return _commandThread->getWorkerQueue(); } 00321 00326 bool inCommandThread() const { return _commandThread->isCurrent(); } 00327 00329 int64_t getTime64(){ return _clock.getTime64(); } 00331 00335 CO_API void addListener( ConnectionPtr connection ); 00336 00338 CO_API uint32_t removeListenerNB( ConnectionPtr connection ); 00339 00346 void flushCommands() { _incoming.interrupt(); } 00347 00349 Command& cloneCommand( Command& command ) 00350 { return _commandCache.clone( command ); } 00351 00353 CO_API Command& allocCommand( const uint64_t size ); 00354 00362 CO_API bool dispatchCommand( Command& command ); 00364 00366 CO_API void ackRequest( NodePtr node, const uint32_t requestID ); 00367 00369 CO_API void ping( NodePtr remoteNode ); 00370 00376 CO_API bool pingIdleNodes(); 00377 00378 protected: 00392 CO_API bool connect( NodePtr node, ConnectionPtr connection ); 00393 00395 virtual void notifyDisconnect( NodePtr node ) { } 00396 00397 private: 00398 typedef std::list< Command* > CommandList; 00399 00401 CommandList _pendingCommands; 00402 00404 CommandCache _commandCache; 00405 00407 bool _hasSendToken; 00408 uint64_t _lastTokenTime; 00409 std::deque< Command* > _sendTokenQueue; 00410 00412 ObjectStore* _objectStore; 00413 00415 base::Lock _connectMutex; 00416 00417 typedef base::RefPtrHash< Connection, NodePtr > ConnectionNodeHash; 00418 typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter; 00420 ConnectionNodeHash _connectionNodes; // read and write: recv only 00421 00422 typedef stde::hash_map< uint128_t, NodePtr > NodeHash; 00423 typedef NodeHash::const_iterator NodeHashCIter; 00425 // r: all, w: recv 00426 base::Lockable< NodeHash, base::SpinLock > _nodes; 00427 00429 ConnectionSet _incoming; 00430 00432 base::Clock _clock; 00433 00437 class ReceiverThread : public base::Thread 00438 { 00439 public: 00440 ReceiverThread( LocalNode* localNode ) : _localNode( localNode ){} 00441 virtual bool init() 00442 { 00443 setName( std::string("Rcv ") + base::className(_localNode)); 00444 return _localNode->_commandThread->start(); 00445 } 00446 virtual void run(){ _localNode->_runReceiverThread(); } 00447 00448 private: 00449 LocalNode* const _localNode; 00450 }; 00451 ReceiverThread* _receiverThread; 00452 00453 bool _connectSelf(); 00454 void _connectMulticast( NodePtr node ); 00455 00456 void _cleanup(); 00457 CO_API void _addConnection( ConnectionPtr connection ); 00458 void _removeConnection( ConnectionPtr connection ); 00459 00460 NodePtr _connect( const NodeID& nodeID, NodePtr peer ); 00461 uint32_t _connect( NodePtr node ); 00462 uint32_t _connect( NodePtr node, ConnectionPtr connection ); 00463 00468 bool _inReceiverThread() const { return _receiverThread->isCurrent(); } 00469 00470 void _receiverThreadStart() { _receiverThread->start(); } 00471 00472 void _runReceiverThread(); 00473 void _handleConnect(); 00474 void _handleDisconnect(); 00475 bool _handleData(); 00477 00478 friend class ObjectStore; 00479 template< typename T > void 00480 _registerCommand( const uint32_t command, const CommandFunc< T >& func, 00481 CommandQueue* destinationQueue ) 00482 { 00483 registerCommand( command, func, destinationQueue ); 00484 } 00485 00491 class CommandThread : public Worker 00492 { 00493 public: 00494 CommandThread( LocalNode* localNode ) : _localNode( localNode ){} 00495 00496 protected: 00497 virtual bool init(); 00498 virtual bool stopRunning() { return _localNode->isClosed(); } 00499 virtual bool notifyIdle(); 00500 00501 private: 00502 LocalNode* const _localNode; 00503 }; 00504 CommandThread* _commandThread; 00505 00506 void _dispatchCommand( Command& command ); 00507 void _redispatchCommands(); 00508 00510 bool _cmdAckRequest( Command& packet ); 00511 bool _cmdStopRcv( Command& command ); 00512 bool _cmdStopCmd( Command& command ); 00513 bool _cmdConnect( Command& command ); 00514 bool _cmdConnectReply( Command& command ); 00515 bool _cmdConnectAck( Command& command ); 00516 bool _cmdID( Command& command ); 00517 bool _cmdDisconnect( Command& command ); 00518 bool _cmdGetNodeData( Command& command ); 00519 bool _cmdGetNodeDataReply( Command& command ); 00520 bool _cmdAcquireSendToken( Command& command ); 00521 bool _cmdAcquireSendTokenReply( Command& command ); 00522 bool _cmdReleaseSendToken( Command& command ); 00523 bool _cmdAddListener( Command& command ); 00524 bool _cmdRemoveListener( Command& command ); 00525 bool _cmdPing( Command& command ); 00526 bool _cmdDiscard( Command& ) { return true; } 00528 00529 EQ_TS_VAR( _cmdThread ); 00530 EQ_TS_VAR( _rcvThread ); 00531 }; 00532 inline std::ostream& operator << ( std::ostream& os, const LocalNode& node ) 00533 { 00534 os << static_cast< const Node& >( node ); 00535 return os; 00536 } 00537 } 00538 #endif // CO_LOCALNODE_H