Equalizer 1.0

localNode.h

00001 
00002 /* Copyright (c) 2005-2011, Stefan Eilemann <eile@equalizergraphics.com>
00003  *                    2010, Cedric Stalder <cedric.stalder@gmail.com> 
00004  *
00005  * This library is free software; you can redistribute it and/or modify it under
00006  * the terms of the GNU Lesser General Public License version 2.1 as published
00007  * by the Free Software Foundation.
00008  *  
00009  * This library is distributed in the hope that it will be useful, but WITHOUT
00010  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00012  * details.
00013  * 
00014  * You should have received a copy of the GNU Lesser General Public License
00015  * along with this library; if not, write to the Free Software Foundation, Inc.,
00016  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00017  */
00018 
00019 #ifndef CO_LOCALNODE_H
00020 #define CO_LOCALNODE_H
00021 
00022 #include <co/node.h>            // base class
00023 #include <co/base/requestHandler.h> // base class
00024 
00025 #include <co/commandCache.h>    // member
00026 #include <co/commandQueue.h>    // member
00027 #include <co/connectionSet.h>   // member
00028 #include <co/objectVersion.h>   // used in inline method
00029 
00030 
00031 #include <co/base/hash.h>           // member
00032 #include <co/base/lockable.h>       // member
00033 #include <co/base/spinLock.h>       // member
00034 #include <co/base/types.h>          // member
00035 
00036 #pragma warning(push)
00037 #pragma warning(disable: 4190)
00038 extern "C" EQSERVER_EXPORT co::ConnectionPtr eqsStartLocalServer( const
00039                                                                  std::string& );
00040 extern "C" EQSERVER_EXPORT void eqsJoinLocalServer();
00041 #pragma warning(pop)
00042 
00043 namespace co
00044 {
00045     class ObjectStore;
00046 
00053     class LocalNode : public base::RequestHandler, public Node
00054     {
00055     public:
00056         CO_API LocalNode( );
00057         CO_API virtual ~LocalNode( );
00058 
00083         CO_API virtual bool initLocal( const int argc, char** argv );
00084         
00097         CO_API virtual bool listen();
00098 
00108         CO_API virtual bool close();
00109 
00113         virtual bool exitLocal() { return close(); }
00114 
00129         CO_API bool connect( NodePtr node );
00130 
00142         CO_API NodePtr connect( const NodeID& nodeID );
00143 
00151         CO_API virtual bool disconnect( NodePtr node );
00153 
00157         CO_API void disableInstanceCache();
00158 
00160         CO_API void expireInstanceData( const int64_t age );
00161 
00177         CO_API void enableSendOnRegister();
00178         
00180         CO_API void disableSendOnRegister(); 
00181         
00193         CO_API bool registerObject( Object* object );
00194 
00200         CO_API virtual void deregisterObject( Object* object );
00201 
00235         CO_API bool mapObject( Object* object, const base::UUID& id, 
00236                                   const uint128_t& version = VERSION_OLDEST );
00237 
00239         bool mapObject( Object* object, const ObjectVersion& v )
00240             { return mapObject( object, v.identifier, v.version ); }
00241 
00243         CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 
00244                                     const uint128_t& version = VERSION_OLDEST );
00245 
00250         CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 
00251                                      const uint128_t& version, NodePtr master );
00252 
00254         CO_API bool mapObjectSync( const uint32_t requestID );
00255 
00261         CO_API void unmapObject( Object* object );
00262 
00264         CO_API void releaseObject( Object* object );
00265 
00268         CO_API void swapObject( Object* oldObject, Object* newObject );
00270 
00281         CO_API NodePtr getNode( const NodeID& id ) const;
00282 
00284         void getNodes( Nodes& nodes, const bool addSelf = true ) const;
00285 
00286         CO_API void acquireSendToken( NodePtr toNode );
00287         CO_API void releaseSendToken( NodePtr toNode );
00288 
00290         virtual CommandQueue* getCommandThreadQueue() 
00291             { return &_commandThreadQueue; }
00292 
00297         bool inCommandThread() const  { return _commandThread->isCurrent(); }
00299 
00303         CO_API void addListener( ConnectionPtr connection );
00304 
00306         CO_API uint32_t removeListenerNB( ConnectionPtr connection );
00307 
00314         void flushCommands() { _incoming.interrupt(); }
00315 
00317         Command& cloneCommand( Command& command )
00318             { return _commandCache.clone( command ); }
00319 
00321         CO_API Command& allocCommand( const uint64_t size );
00322 
00330         CO_API bool dispatchCommand( Command& command );
00332 
00334         CO_API void ackRequest( NodePtr node, const uint32_t requestID );
00335 
00336     protected:
00350         CO_API bool _connect( NodePtr node, ConnectionPtr connection );
00351 
00352     private:
00353         typedef std::list< Command* > CommandList;
00354 
00356         CommandList  _pendingCommands;
00357     
00359         CommandCache _commandCache;
00360 
00362         CommandQueue _commandThreadQueue;        
00363     
00365         bool _hasSendToken;
00366         std::deque< Command* > _sendTokenQueue;
00367 
00369         ObjectStore* _objectStore;
00370 
00372         base::Lock _connectMutex;
00373     
00375         typedef base::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
00376         ConnectionNodeHash _connectionNodes; // read and write: recv only
00377 
00379         typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
00380         // r: all, w: recv
00381         base::Lockable< NodeHash, base::SpinLock > _nodes; 
00382 
00384         ConnectionSet _incoming;
00385     
00386         friend EQSERVER_EXPORT 
00387         co::ConnectionPtr (::eqsStartLocalServer( const std::string& ));
00388 
00392         class ReceiverThread : public base::Thread
00393         {
00394         public:
00395             ReceiverThread( LocalNode* localNode ) : _localNode( localNode ){}
00396             virtual bool init()
00397                 {
00398                     setName( std::string("Rcv ") + base::className(_localNode));
00399                     return _localNode->_commandThread->start();
00400                 }
00401             virtual void run(){ _localNode->_runReceiverThread(); }
00402 
00403         private:
00404             LocalNode* const _localNode;
00405         };
00406         ReceiverThread* _receiverThread;
00407 
00408         bool _connectSelf();
00409         void _connectMulticast( NodePtr node );
00410 
00411         void _cleanup();
00412         CO_API void _addConnection( ConnectionPtr connection );
00413         void _removeConnection( ConnectionPtr connection );
00414         NodePtr _connect( const NodeID& nodeID, NodePtr peer );
00415 
00420         bool _inReceiverThread() const { return _receiverThread->isCurrent(); }
00421 
00422         void _receiverThreadStart() { _receiverThread->start(); }
00423 
00424         void _runReceiverThread();
00425         void   _handleConnect();
00426         void   _handleDisconnect();
00427         bool   _handleData();
00429 
00430         friend class ObjectStore;
00431         template< typename T >
00432         void _registerCommand( const uint32_t command,
00433                                const CommandFunc< T >& func,
00434                                CommandQueue* destinationQueue )
00435         {
00436             registerCommand( command, func, destinationQueue );
00437         }
00438 
00444         class CommandThread : public base::Thread
00445         {
00446         public:
00447             CommandThread( LocalNode* localNode ) : _localNode( localNode ){}
00448             virtual bool init()
00449                 {
00450                     setName( std::string("Cmd ") + base::className(_localNode));
00451                     return true;
00452                 }
00453             virtual void run(){ _localNode->_runCommandThread(); }
00454         private:
00455             LocalNode* const _localNode;
00456         };
00457         CommandThread* _commandThread;
00458 
00459         void _dispatchCommand( Command& command );
00460         void _runCommandThread();
00461         void   _redispatchCommands();
00462 
00464         bool _cmdAckRequest( Command& packet );
00465         bool _cmdStop( Command& command );
00466         bool _cmdConnect( Command& command );
00467         bool _cmdConnectReply( Command& command );
00468         bool _cmdConnectAck( Command& command );
00469         bool _cmdID( Command& command );
00470         bool _cmdDisconnect( Command& command );
00471         bool _cmdGetNodeData( Command& command );
00472         bool _cmdGetNodeDataReply( Command& command );
00473         bool _cmdAcquireSendToken( Command& command );
00474         bool _cmdAcquireSendTokenReply( Command& command );
00475         bool _cmdReleaseSendToken( Command& command );
00476         bool _cmdAddListener( Command& command );
00477         bool _cmdRemoveListener( Command& command );
00479 
00480         EQ_TS_VAR( _cmdThread );
00481         EQ_TS_VAR( _recvThread );
00482     };
00483     inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
00484     {
00485         os << static_cast< const Node& >( node );
00486         return os;
00487     }
00488 }
00489 #endif // CO_LOCALNODE_H
Generated on Sun May 8 2011 19:11:07 for Equalizer 1.0 by  doxygen 1.7.3