Collage  1.0.1
Object-Oriented C++ Network Library
localNode.h
1 
2 /* Copyright (c) 2005-2013, Stefan Eilemann <eile@equalizergraphics.com>
3  * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4  * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5  *
6  * This file is part of Collage <https://github.com/Eyescale/Collage>
7  *
8  * This library is free software; you can redistribute it and/or modify it under
9  * the terms of the GNU Lesser General Public License version 2.1 as published
10  * by the Free Software Foundation.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15  * details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21 
22 #ifndef CO_LOCALNODE_H
23 #define CO_LOCALNODE_H
24 
25 #include <co/node.h> // base class
26 #include <co/objectHandler.h> // base class
27 #include <co/objectVersion.h> // VERSION_FOO used inline
28 #include <lunchbox/requestHandler.h> // base class
29 
30 #include <boost/function/function1.hpp>
31 #include <boost/function/function4.hpp>
32 
33 namespace co
34 {
35 namespace detail { class LocalNode; class ReceiverThread; class CommandThread; }
36 
45  class LocalNode : public lunchbox::RequestHandler, public Node,
46  public ObjectHandler
47  {
48  public:
50  CO_API LocalNode( const uint32_t type = co::NODETYPE_NODE );
51 
53  CO_API virtual ~LocalNode();
54 
86  CO_API virtual bool initLocal( const int argc, char** argv );
87 
100  CO_API virtual bool listen();
101  CO_API virtual bool listen( ConnectionPtr connection );
102 
112  CO_API virtual bool close();
113 
115  virtual bool exitLocal() { return close(); }
116 
133  CO_API bool connect( NodePtr node );
134 
147  CO_API NodePtr connect( const NodeID& nodeID );
148 
156  CO_API virtual bool disconnect( NodePtr node );
158 
173  CO_API virtual bool registerObject( Object* object );
174 
184  CO_API virtual void deregisterObject( Object* object );
185 
223  CO_API bool mapObject( Object* object, const UUID& id,
224  const uint128_t& version = VERSION_OLDEST );
225 
227  bool mapObject( Object* object, const ObjectVersion& v )
228  { return mapObject( object, v.identifier, v.version ); }
229 
231  CO_API uint32_t mapObjectNB( Object* object, const UUID& id,
232  const uint128_t& version = VERSION_OLDEST);
233 
239  CO_API virtual uint32_t mapObjectNB( Object* object, const UUID& id,
240  const uint128_t& version,
241  NodePtr master );
242 
244  CO_API virtual bool mapObjectSync( const uint32_t requestID );
245 
252  CO_API virtual void unmapObject( Object* object );
253 
255  CO_API void disableInstanceCache();
256 
258  CO_API void expireInstanceData( const int64_t age );
259 
277  CO_API void enableSendOnRegister();
278 
280  CO_API void disableSendOnRegister();
281 
302  CO_API virtual void objectPush( const uint128_t& groupID,
303  const uint128_t& objectType,
304  const uint128_t& objectID,
305  DataIStream& istream );
306 
308  typedef boost::function< void( const uint128_t&,
309  const uint128_t&,
310  const uint128_t&,
322  CO_API void registerPushHandler( const uint128_t& groupID,
323  const PushHandler& handler );
324 
325 
327  typedef boost::function< bool( CustomICommand& ) > CommandHandler;
328 
341  CO_API bool registerCommandHandler( const uint128_t& command,
342  const CommandHandler& func,
343  CommandQueue* queue );
344 
347  CO_API void swapObject( Object* oldObject, Object* newObject );
349 
361  CO_API NodePtr getNode( const NodeID& id ) const;
362 
364  CO_API void getNodes( Nodes& nodes, const bool addSelf = true ) const;
365 
368 
374  CO_API bool inCommandThread() const;
375 
377  CO_API int64_t getTime64() const;
379 
387 
389  CO_API void addListener( ConnectionPtr connection );
390 
392  CO_API void removeListeners( const Connections& connections );
393 
400  CO_API void flushCommands();
401 
403  CO_API BufferPtr allocBuffer( const uint64_t size );
404 
416  CO_API virtual bool dispatchCommand( ICommand& command );
417 
418 
420  typedef lunchbox::RefPtr< co::SendToken > SendToken;
421 
430  CO_API SendToken acquireSendToken( NodePtr toNode );
431 
433  CO_API void releaseSendToken( SendToken token );
434 
436  CO_API Zeroconf getZeroconf();
438 
440  CO_API void ackRequest( NodePtr node, const uint32_t requestID );
441 
443  CO_API void ping( NodePtr remoteNode );
444 
450  CO_API bool pingIdleNodes();
451 
456  CO_API void setAffinity( const int32_t affinity );
457 
458  protected:
471  CO_API bool connect( NodePtr node, ConnectionPtr connection );
472 
474  virtual void notifyConnect( NodePtr node ) {}
475 
477  virtual void notifyDisconnect( NodePtr node ) {}
478 
487  CO_API virtual NodePtr createNode( const uint32_t type );
488 
489  private:
490  detail::LocalNode* const _impl;
491 
492  bool _connectSelf();
493 
494  bool _startCommandThread();
495  bool _notifyCommandThreadIdle();
496  friend class detail::ReceiverThread;
497  friend class detail::CommandThread;
498 
499  void _cleanup();
500  void _closeNode( NodePtr node );
501  CO_API void _addConnection( ConnectionPtr connection );
502  void _removeConnection( ConnectionPtr connection );
503 
504  NodePtr _connect( const NodeID& nodeID, NodePtr peer );
505  NodePtr _connectFromZeroconf( const NodeID& nodeID );
506  uint32_t _removeListenerNB( ConnectionPtr connection );
507  uint32_t _connect( NodePtr node );
508  uint32_t _connect( NodePtr node, ConnectionPtr connection );
509 
510  void _runReceiverThread();
511  void _handleConnect();
512  void _handleDisconnect();
513  bool _handleData();
514  BufferPtr _readHead( ConnectionPtr connection );
515  ICommand _setupCommand( ConnectionPtr, ConstBufferPtr );
516  bool _readTail( ICommand&, BufferPtr, ConnectionPtr );
517  void _initService();
518  void _exitService();
519 
520  friend class ObjectStore;
521  template< typename T > void
522  _registerCommand( const uint32_t command, const CommandFunc< T >& func,
523  CommandQueue* destinationQueue )
524  {
525  registerCommand( command, func, destinationQueue );
526  }
527 
528  void _dispatchCommand( ICommand& command );
529  void _redispatchCommands();
530 
532  bool _cmdAckRequest( ICommand& command );
533  bool _cmdStopRcv( ICommand& command );
534  bool _cmdStopCmd( ICommand& command );
535  bool _cmdSetAffinity( ICommand& command );
536  bool _cmdConnect( ICommand& command );
537  bool _cmdConnectReply( ICommand& command );
538  bool _cmdConnectAck( ICommand& command );
539  bool _cmdID( ICommand& command );
540  bool _cmdDisconnect( ICommand& command );
541  bool _cmdGetNodeData( ICommand& command );
542  bool _cmdGetNodeDataReply( ICommand& command );
543  bool _cmdAcquireSendToken( ICommand& command );
544  bool _cmdAcquireSendTokenReply( ICommand& command );
545  bool _cmdReleaseSendToken( ICommand& command );
546  bool _cmdAddListener( ICommand& command );
547  bool _cmdRemoveListener( ICommand& command );
548  bool _cmdPing( ICommand& command );
549  bool _cmdCommand( ICommand& command );
550  bool _cmdCommandAsync( ICommand& command );
551  bool _cmdAddConnection( ICommand& command );
552  bool _cmdDiscard( ICommand& ) { return true; }
554 
555  LB_TS_VAR( _cmdThread );
556  LB_TS_VAR( _rcvThread );
557  };
558 
559  inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
560  {
561  os << static_cast< const Node& >( node );
562  return os;
563  }
564 }
565 #endif // CO_LOCALNODE_H
Interface for entities which map and register objects.
Definition: objectHandler.h:29
virtual CO_API bool registerObject(Object *object)
Register a distributed object.
CO_API bool registerCommandHandler(const uint128_t &command, const CommandHandler &func, CommandQueue *queue)
Register a custom command handler handled by this node.
CO_API Zeroconf getZeroconf()
std::vector< ConnectionPtr > Connections
A vector of ConnectionPtr&#39;s.
Definition: types.h:116
CO_API SendToken acquireSendToken(NodePtr toNode)
Acquire a send token from the given node.
CO_API bool connect(NodePtr node)
Connect a remote node (proxy) to this listening node.
A zeroconf communicator.
Definition: zeroconf.h:46
CO_API ConnectionPtr addListener(ConnectionDescriptionPtr desc)
Add a listening connection to this listening node.
virtual CO_API bool disconnect(NodePtr node)
Disconnect a connected node.
UUID NodeID
A unique identifier for nodes.
Definition: types.h:77
lunchbox::RefPtr< ConnectionDescription > ConnectionDescriptionPtr
A reference pointer for ConnectionDescription pointers.
Definition: types.h:90
Node specialization for a local node.
Definition: localNode.h:45
A class managing received commands.
Definition: iCommand.h:43
boost::function< bool(CustomICommand &) > CommandHandler
Function signature for custom command handlers.
Definition: localNode.h:327
virtual CO_API bool dispatchCommand(ICommand &command)
Dispatches a command to the registered command queue.
CO_API void removeListeners(const Connections &connections)
Remove listening connections from this listening node.
uint128_t version
the object version
Definition: objectVersion.h:90
virtual CO_API bool close()
Close a listening node.
lunchbox::RefPtr< co::SendToken > SendToken
A handle for a send token acquired by acquireSendToken().
Definition: localNode.h:420
CO_API CommandQueue * getCommandThreadQueue()
Return the command queue to the command thread.
CO_API void registerPushHandler(const uint128_t &groupID, const PushHandler &handler)
Register a custom handler for Object::push operations.
virtual CO_API ~LocalNode()
Destruct this local node.
CO_API void setAffinity(const int32_t affinity)
Bind this, the receiver and the command thread to the given lunchbox::Thread affinity.
Proxy node representing a remote LocalNode.
Definition: node.h:42
virtual CO_API void deregisterObject(Object *object)
Deregister a distributed object.
virtual CO_API bool mapObjectSync(const uint32_t requestID)
Finalize the mapping of a distributed object.
A helper struct bundling an object identifier and version.
Definition: objectVersion.h:45
CO_API NodePtr getNode(const NodeID &id) const
Get a node by identifier.
uint128_t identifier
the object identifier
Definition: objectVersion.h:89
CO_API void enableSendOnRegister()
Enable sending instance data after registration.
virtual CO_API bool listen()
Open all connections and put this node into the listening state.
virtual CO_API void unmapObject(Object *object)
Unmap a mapped object.
CO_API void disableSendOnRegister()
Disable sending data of newly registered objects.
lunchbox::RefPtr< Node > NodePtr
A reference pointer for Node pointers.
Definition: types.h:80
A std::istream-like input data stream for binary data.
Definition: dataIStream.h:40
CO_API void releaseSendToken(SendToken token)
Release the given send token.
CO_API void getNodes(Nodes &nodes, const bool addSelf=true) const
Assemble a vector of the currently connected nodes.
virtual CO_API void objectPush(const uint128_t &groupID, const uint128_t &objectType, const uint128_t &objectID, DataIStream &istream)
Handler for an Object::push() operation.
CO_API void ping(NodePtr remoteNode)
Request keep-alive update from the remote node.
boost::function< void(const uint128_t &, const uint128_t &, const uint128_t &, DataIStream &) > PushHandler
Function signature for push handlers.
Definition: localNode.h:311
virtual CO_API bool initLocal(const int argc, char **argv)
Initialize the node.
CO_API uint32_t mapObjectNB(Object *object, const UUID &id, const uint128_t &version=VERSION_OLDEST)
Start mapping a distributed object.
virtual bool exitLocal()
Close a listening node.
Definition: localNode.h:115
A distributed object.
Definition: object.h:45
CO_API void disableInstanceCache()
Disable the instance cache of a stopped local node.
void registerCommand(const uint32_t command, const CommandFunc< T > &func, CommandQueue *queue)
Register a command member function for a command.
Definition: dispatcher.h:94
CO_API bool pingIdleNodes()
Request updates from all nodes above keep-alive timeout.
bool mapObject(Object *object, const ObjectVersion &v)
Convenience wrapper for mapObject().
Definition: localNode.h:227
CO_API bool mapObject(Object *object, const UUID &id, const uint128_t &version=VERSION_OLDEST)
Map a distributed object.
CO_API bool inCommandThread() const
std::vector< NodePtr > Nodes
A vector of NodePtr&#39;s.
Definition: types.h:98
A plain co::Node.
Definition: nodeType.h:31
virtual CO_API NodePtr createNode(const uint32_t type)
Factory method to create a new node.
CO_API LocalNode(const uint32_t type=co::NODETYPE_NODE)
Construct a new local node of the given type.
A thread-safe queue for ICommand buffers.
Definition: commandQueue.h:32
lunchbox::RefPtr< Connection > ConnectionPtr
A reference pointer for Connection pointers.
Definition: types.h:88