Equalizer  1.2.1
localNode.h
00001 
00002 /* Copyright (c) 2005-2012, Stefan Eilemann <eile@equalizergraphics.com>
00003  *                    2010, Cedric Stalder <cedric.stalder@gmail.com> 
00004  *
00005  * This library is free software; you can redistribute it and/or modify it under
00006  * the terms of the GNU Lesser General Public License version 2.1 as published
00007  * by the Free Software Foundation.
00008  *  
00009  * This library is distributed in the hope that it will be useful, but WITHOUT
00010  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00012  * details.
00013  * 
00014  * You should have received a copy of the GNU Lesser General Public License
00015  * along with this library; if not, write to the Free Software Foundation, Inc.,
00016  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00017  */
00018 
00019 #ifndef CO_LOCALNODE_H
00020 #define CO_LOCALNODE_H
00021 
00022 #include <co/node.h>            // base class
00023 #include <co/base/requestHandler.h> // base class
00024 
00025 #include <co/commandCache.h>    // member
00026 #include <co/commandQueue.h>    // member
00027 #include <co/connectionSet.h>   // member
00028 #include <co/objectVersion.h>   // used in inline method
00029 #include <co/worker.h>          // member
00030 
00031 #include <co/base/clock.h>          // member
00032 #include <co/base/hash.h>           // member
00033 #include <co/base/lockable.h>       // member
00034 #include <co/base/spinLock.h>       // member
00035 #include <co/base/types.h>          // member
00036 
00037 namespace co
00038 {
00039     class ObjectStore;
00040 
00047     class LocalNode : public base::RequestHandler, public Node
00048     {
00049     public:
00050         CO_API LocalNode( );
00051         CO_API virtual ~LocalNode( );
00052 
00053         typedef NodePtr SendToken; 
00054 
00085         CO_API virtual bool initLocal( const int argc, char** argv );
00086         
00099         CO_API virtual bool listen();
00100         CO_API virtual bool listen( ConnectionPtr connection ); 
00101 
00111         CO_API virtual bool close();
00112 
00114         virtual bool exitLocal() { return close(); }
00115 
00131         CO_API bool connect( NodePtr node );
00132 
00144         CO_API NodePtr connect( const NodeID& nodeID );
00145 
00153         CO_API virtual bool disconnect( NodePtr node );
00155 
00159         CO_API void disableInstanceCache();
00160 
00162         CO_API void expireInstanceData( const int64_t age );
00163 
00179         CO_API void enableSendOnRegister();
00180         
00182         CO_API void disableSendOnRegister(); 
00183         
00195         CO_API bool registerObject( Object* object );
00196 
00202         CO_API virtual void deregisterObject( Object* object );
00203 
00237         CO_API bool mapObject( Object* object, const base::UUID& id, 
00238                                const uint128_t& version = VERSION_OLDEST );
00239 
00241         bool mapObject( Object* object, const ObjectVersion& v )
00242             { return mapObject( object, v.identifier, v.version ); }
00243 
00245         CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 
00246                                     const uint128_t& version = VERSION_OLDEST );
00247 
00252         CO_API uint32_t mapObjectNB( Object* object, const base::UUID& id, 
00253                                      const uint128_t& version, NodePtr master );
00254 
00256         CO_API bool mapObjectSync( const uint32_t requestID );
00257 
00263         CO_API void unmapObject( Object* object );
00264 
00266         CO_API void releaseObject( Object* object );
00267 
00286         CO_API virtual void objectPush( const uint128_t& groupID,
00287                                         const uint128_t& typeID,
00288                                         const uint128_t& objectID,
00289                                         DataIStream& istream );
00290 
00293         CO_API void swapObject( Object* oldObject, Object* newObject );
00295 
00306         CO_API NodePtr getNode( const NodeID& id ) const;
00307 
00309         void getNodes( Nodes& nodes, const bool addSelf = true ) const;
00310 
00315         CO_API SendToken acquireSendToken( NodePtr toNode );
00316         CO_API void releaseSendToken( SendToken& token );
00317 
00319         CommandQueue* getCommandThreadQueue()
00320             { return _commandThread->getWorkerQueue(); }
00321 
00326         bool inCommandThread() const  { return _commandThread->isCurrent(); }
00327 
00329         int64_t getTime64(){ return _clock.getTime64(); }
00331 
00335         CO_API void addListener( ConnectionPtr connection );
00336 
00338         CO_API uint32_t removeListenerNB( ConnectionPtr connection );
00339 
00346         void flushCommands() { _incoming.interrupt(); }
00347 
00349         Command& cloneCommand( Command& command )
00350             { return _commandCache.clone( command ); }
00351 
00353         CO_API Command& allocCommand( const uint64_t size );
00354 
00362         CO_API bool dispatchCommand( Command& command );
00364 
00366         CO_API void ackRequest( NodePtr node, const uint32_t requestID );
00367 
00369         CO_API void ping( NodePtr remoteNode );
00370 
00376         CO_API bool pingIdleNodes();
00377 
00378     protected:
00392         CO_API bool connect( NodePtr node, ConnectionPtr connection );
00393 
00395         virtual void notifyDisconnect( NodePtr node ) { }
00396 
00397     private:
00398         typedef std::list< Command* > CommandList;
00399 
00401         CommandList  _pendingCommands;
00402 
00404         CommandCache _commandCache;
00405 
00407         bool _hasSendToken;
00408         uint64_t _lastTokenTime; 
00409         std::deque< Command* > _sendTokenQueue; 
00410 
00412         ObjectStore* _objectStore;
00413 
00415         base::Lock _connectMutex;
00416     
00417         typedef base::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
00418         typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
00420         ConnectionNodeHash _connectionNodes; // read and write: recv only
00421 
00422         typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
00423         typedef NodeHash::const_iterator NodeHashCIter;
00425         // r: all, w: recv
00426         base::Lockable< NodeHash, base::SpinLock > _nodes; 
00427 
00429         ConnectionSet _incoming;
00430 
00432         base::Clock _clock;
00433     
00437         class ReceiverThread : public base::Thread
00438         {
00439         public:
00440             ReceiverThread( LocalNode* localNode ) : _localNode( localNode ){}
00441             virtual bool init()
00442                 {
00443                     setName( std::string("Rcv ") + base::className(_localNode));
00444                     return _localNode->_commandThread->start();
00445                 }
00446             virtual void run(){ _localNode->_runReceiverThread(); }
00447 
00448         private:
00449             LocalNode* const _localNode;
00450         };
00451         ReceiverThread* _receiverThread;
00452 
00453         bool _connectSelf();
00454         void _connectMulticast( NodePtr node );
00455 
00456         void _cleanup();
00457         CO_API void _addConnection( ConnectionPtr connection );
00458         void _removeConnection( ConnectionPtr connection );
00459 
00460         NodePtr _connect( const NodeID& nodeID, NodePtr peer );
00461         uint32_t _connect( NodePtr node );
00462         uint32_t _connect( NodePtr node, ConnectionPtr connection );
00463 
00468         bool _inReceiverThread() const { return _receiverThread->isCurrent(); }
00469 
00470         void _receiverThreadStart() { _receiverThread->start(); }
00471 
00472         void _runReceiverThread();
00473         void   _handleConnect();
00474         void   _handleDisconnect();
00475         bool   _handleData();
00477 
00478         friend class ObjectStore;
00479         template< typename T > void
00480         _registerCommand( const uint32_t command, const CommandFunc< T >& func,
00481                           CommandQueue* destinationQueue )
00482         {
00483             registerCommand( command, func, destinationQueue );
00484         }
00485 
00491         class CommandThread : public Worker
00492         {
00493         public:
00494             CommandThread( LocalNode* localNode ) : _localNode( localNode ){}
00495 
00496         protected:
00497             virtual bool init();
00498             virtual bool stopRunning() { return _localNode->isClosed(); }
00499             virtual bool notifyIdle();
00500 
00501         private:
00502             LocalNode* const _localNode;
00503         };
00504         CommandThread* _commandThread;
00505 
00506         void _dispatchCommand( Command& command );
00507         void   _redispatchCommands();
00508 
00510         bool _cmdAckRequest( Command& packet );
00511         bool _cmdStopRcv( Command& command );
00512         bool _cmdStopCmd( Command& command );
00513         bool _cmdConnect( Command& command );
00514         bool _cmdConnectReply( Command& command );
00515         bool _cmdConnectAck( Command& command );
00516         bool _cmdID( Command& command );
00517         bool _cmdDisconnect( Command& command );
00518         bool _cmdGetNodeData( Command& command );
00519         bool _cmdGetNodeDataReply( Command& command );
00520         bool _cmdAcquireSendToken( Command& command );
00521         bool _cmdAcquireSendTokenReply( Command& command );
00522         bool _cmdReleaseSendToken( Command& command );
00523         bool _cmdAddListener( Command& command );
00524         bool _cmdRemoveListener( Command& command );
00525         bool _cmdPing( Command& command );
00526         bool _cmdDiscard( Command& ) { return true; }
00528 
00529         EQ_TS_VAR( _cmdThread );
00530         EQ_TS_VAR( _rcvThread );
00531     };
00532     inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
00533     {
00534         os << static_cast< const Node& >( node );
00535         return os;
00536     }
00537 }
00538 #endif // CO_LOCALNODE_H
Generated on Fri Jun 8 2012 15:44:31 for Equalizer 1.2.1 by  doxygen 1.8.0