Equalizer 1.0
|
00001 00002 /* Copyright (c) 2005-2011, Stefan Eilemann <eile@equalizergraphics.com> 00003 * 2010, Cedric Stalder <cedric.stalder@gmail.com> 00004 * 00005 * This library is free software; you can redistribute it and/or modify it under 00006 * the terms of the GNU Lesser General Public License version 2.1 as published 00007 * by the Free Software Foundation. 00008 * 00009 * This library is distributed in the hope that it will be useful, but WITHOUT 00010 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00011 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00012 * details. 00013 * 00014 * You should have received a copy of the GNU Lesser General Public License 00015 * along with this library; if not, write to the Free Software Foundation, Inc., 00016 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 00017 */ 00018 00019 #ifndef CO_LOCALNODE_H 00020 #define CO_LOCALNODE_H 00021 00022 #include <co/node.h> // base class 00023 #include <co/base/requestHandler.h> // base class 00024 00025 #include <co/commandCache.h> // member 00026 #include <co/commandQueue.h> // member 00027 #include <co/connectionSet.h> // member 00028 #include <co/objectVersion.h> // used in inline method 00029 00030 00031 #include <co/base/hash.h> // member 00032 #include <co/base/lockable.h> // member 00033 #include <co/base/spinLock.h> // member 00034 #include <co/base/types.h> // member 00035 00036 #pragma warning(push) 00037 #pragma warning(disable: 4190) 00038 extern "C" EQSERVER_EXPORT co::ConnectionPtr eqsStartLocalServer( const 00039 std::string& ); 00040 extern "C" EQSERVER_EXPORT void eqsJoinLocalServer(); 00041 #pragma warning(pop) 00042 00043 namespace co 00044 { 00045 class ObjectStore; 00046 00053 class LocalNode : public base::RequestHandler, public Node 00054 { 00055 public: 00056 CO_API LocalNode( ); 00057 CO_API virtual ~LocalNode( ); 00058 00083 CO_API virtual bool initLocal( const int argc, char** argv ); 00084 00097 CO_API virtual bool listen(); 00098 00108 CO_API virtual bool close(); 00109 00113 virtual bool exitLocal() { return close(); } 00114 00129 CO_API bool connect( NodePtr node ); 00130 00142 CO_API NodePtr connect( const NodeID& nodeID ); 00143 00151 CO_API virtual bool disconnect( NodePtr node ); 00153 00157 CO_API void disableInstanceCache(); 00158 00160 CO_API void expireInstanceData( const int64_t age ); 00161 00177 CO_API void enableSendOnRegister(); 00178 00180 CO_API void disableSendOnRegister(); 00181 00193 CO_API bool registerObject( Object* object ); 00194 00200 CO_API virtual void deregisterObject( Object* object ); 00201 00235 CO_API bool mapObject( Object* object, const base::UUID& id, 00236 const uint128_t& version = VERSION_OLDEST ); 00237 00239 bool mapObject( Object* object, const ObjectVersion& v ) 00240 { return mapObject( object, v.identifier, v.version ); } 00241 00243 CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 00244 const uint128_t& version = VERSION_OLDEST ); 00245 00250 CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 00251 const uint128_t& version, NodePtr master ); 00252 00254 CO_API bool mapObjectSync( const uint32_t requestID ); 00255 00261 CO_API void unmapObject( Object* object ); 00262 00264 CO_API void releaseObject( Object* object ); 00265 00268 CO_API void swapObject( Object* oldObject, Object* newObject ); 00270 00281 CO_API NodePtr getNode( const NodeID& id ) const; 00282 00284 void getNodes( Nodes& nodes, const bool addSelf = true ) const; 00285 00286 CO_API void acquireSendToken( NodePtr toNode ); 00287 CO_API void releaseSendToken( NodePtr toNode ); 00288 00290 virtual CommandQueue* getCommandThreadQueue() 00291 { return &_commandThreadQueue; } 00292 00297 bool inCommandThread() const { return _commandThread->isCurrent(); } 00299 00303 CO_API void addListener( ConnectionPtr connection ); 00304 00306 CO_API uint32_t removeListenerNB( ConnectionPtr connection ); 00307 00314 void flushCommands() { _incoming.interrupt(); } 00315 00317 Command& cloneCommand( Command& command ) 00318 { return _commandCache.clone( command ); } 00319 00321 CO_API Command& allocCommand( const uint64_t size ); 00322 00330 CO_API bool dispatchCommand( Command& command ); 00332 00334 CO_API void ackRequest( NodePtr node, const uint32_t requestID ); 00335 00336 protected: 00350 CO_API bool _connect( NodePtr node, ConnectionPtr connection ); 00351 00352 private: 00353 typedef std::list< Command* > CommandList; 00354 00356 CommandList _pendingCommands; 00357 00359 CommandCache _commandCache; 00360 00362 CommandQueue _commandThreadQueue; 00363 00365 bool _hasSendToken; 00366 std::deque< Command* > _sendTokenQueue; 00367 00369 ObjectStore* _objectStore; 00370 00372 base::Lock _connectMutex; 00373 00375 typedef base::RefPtrHash< Connection, NodePtr > ConnectionNodeHash; 00376 ConnectionNodeHash _connectionNodes; // read and write: recv only 00377 00379 typedef stde::hash_map< uint128_t, NodePtr > NodeHash; 00380 // r: all, w: recv 00381 base::Lockable< NodeHash, base::SpinLock > _nodes; 00382 00384 ConnectionSet _incoming; 00385 00386 friend EQSERVER_EXPORT 00387 co::ConnectionPtr (::eqsStartLocalServer( const std::string& )); 00388 00392 class ReceiverThread : public base::Thread 00393 { 00394 public: 00395 ReceiverThread( LocalNode* localNode ) : _localNode( localNode ){} 00396 virtual bool init() 00397 { 00398 setName( std::string("Rcv ") + base::className(_localNode)); 00399 return _localNode->_commandThread->start(); 00400 } 00401 virtual void run(){ _localNode->_runReceiverThread(); } 00402 00403 private: 00404 LocalNode* const _localNode; 00405 }; 00406 ReceiverThread* _receiverThread; 00407 00408 bool _connectSelf(); 00409 void _connectMulticast( NodePtr node ); 00410 00411 void _cleanup(); 00412 CO_API void _addConnection( ConnectionPtr connection ); 00413 void _removeConnection( ConnectionPtr connection ); 00414 NodePtr _connect( const NodeID& nodeID, NodePtr peer ); 00415 00420 bool _inReceiverThread() const { return _receiverThread->isCurrent(); } 00421 00422 void _receiverThreadStart() { _receiverThread->start(); } 00423 00424 void _runReceiverThread(); 00425 void _handleConnect(); 00426 void _handleDisconnect(); 00427 bool _handleData(); 00429 00430 friend class ObjectStore; 00431 template< typename T > 00432 void _registerCommand( const uint32_t command, 00433 const CommandFunc< T >& func, 00434 CommandQueue* destinationQueue ) 00435 { 00436 registerCommand( command, func, destinationQueue ); 00437 } 00438 00444 class CommandThread : public base::Thread 00445 { 00446 public: 00447 CommandThread( LocalNode* localNode ) : _localNode( localNode ){} 00448 virtual bool init() 00449 { 00450 setName( std::string("Cmd ") + base::className(_localNode)); 00451 return true; 00452 } 00453 virtual void run(){ _localNode->_runCommandThread(); } 00454 private: 00455 LocalNode* const _localNode; 00456 }; 00457 CommandThread* _commandThread; 00458 00459 void _dispatchCommand( Command& command ); 00460 void _runCommandThread(); 00461 void _redispatchCommands(); 00462 00464 bool _cmdAckRequest( Command& packet ); 00465 bool _cmdStop( Command& command ); 00466 bool _cmdConnect( Command& command ); 00467 bool _cmdConnectReply( Command& command ); 00468 bool _cmdConnectAck( Command& command ); 00469 bool _cmdID( Command& command ); 00470 bool _cmdDisconnect( Command& command ); 00471 bool _cmdGetNodeData( Command& command ); 00472 bool _cmdGetNodeDataReply( Command& command ); 00473 bool _cmdAcquireSendToken( Command& command ); 00474 bool _cmdAcquireSendTokenReply( Command& command ); 00475 bool _cmdReleaseSendToken( Command& command ); 00476 bool _cmdAddListener( Command& command ); 00477 bool _cmdRemoveListener( Command& command ); 00479 00480 EQ_TS_VAR( _cmdThread ); 00481 EQ_TS_VAR( _recvThread ); 00482 }; 00483 inline std::ostream& operator << ( std::ostream& os, const LocalNode& node ) 00484 { 00485 os << static_cast< const Node& >( node ); 00486 return os; 00487 } 00488 } 00489 #endif // CO_LOCALNODE_H