Collage  1.6.0
High-performance C++ library for developing object-oriented distributed applications.
localNode.h
1 
2 /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3  * Cedric Stalder <cedric.stalder@gmail.com>
4  * Daniel Nachbaur <danielnachbaur@gmail.com>
5  *
6  * This file is part of Collage <https://github.com/Eyescale/Collage>
7  *
8  * This library is free software; you can redistribute it and/or modify it under
9  * the terms of the GNU Lesser General Public License version 2.1 as published
10  * by the Free Software Foundation.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15  * details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21 
22 #ifndef CO_LOCALNODE_H
23 #define CO_LOCALNODE_H
24 
25 #include <co/node.h> // base class
26 #include <co/objectHandler.h> // base class
27 #include <co/objectVersion.h> // VERSION_FOO used inline
28 #include <lunchbox/requestHandler.h> // base class
29 
30 #include <boost/function/function1.hpp>
31 #include <boost/function/function4.hpp>
32 
33 namespace co
34 {
35 namespace detail { class LocalNode; class ReceiverThread; class CommandThread; }
36 
44 class LocalNode : public lunchbox::RequestHandler, public Node,
45  public ObjectHandler
46 {
47 public:
52  enum Counter
53  {
55  COUNTER_ALL // must be last
56  };
57 
59  CO_API explicit LocalNode( const uint32_t type = co::NODETYPE_NODE );
60 
91  CO_API virtual bool initLocal( const int argc, char** argv );
92 
105  CO_API virtual bool listen();
106 
116  CO_API virtual bool close();
117 
119  virtual bool exitLocal() { return close(); }
120 
137  CO_API bool connect( NodePtr node );
138 
151  CO_API NodePtr connect( const NodeID& nodeID );
152 
165  CO_API NodePtr connectObjectMaster( const uint128_t& id );
166 
174  CO_API virtual bool disconnect( NodePtr node );
176 
195  CO_API bool launch( NodePtr node, const std::string& command );
196 
202  CO_API NodePtr syncLaunch( const uint128_t& nodeID, int64_t timeout );
204 
219  CO_API bool registerObject( Object* object ) override;
220 
230  CO_API void deregisterObject( Object* object ) override;
231 
276  CO_API f_bool_t mapObject( Object* object, const uint128_t& id,
277  NodePtr master,
278  const uint128_t& version = VERSION_OLDEST );
279 
281  f_bool_t mapObject( Object* object, const ObjectVersion& v )
282  { return mapObject( object, v.identifier, 0, v.version ); }
283 
285  f_bool_t mapObject( Object* object, const uint128_t& id,
286  const uint128_t& version = VERSION_OLDEST )
287  { return mapObject( object, id, 0, version ); }
288 
290  CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
291  const uint128_t& version = VERSION_OLDEST );
292 
294  CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
295  const uint128_t& version,
296  NodePtr master ) override;
297 
299  CO_API bool mapObjectSync( const uint32_t requestID ) override;
300 
321  CO_API f_bool_t syncObject( Object* object, const uint128_t& id,
322  NodePtr master,
323  const uint32_t instanceID = CO_INSTANCE_ALL ) override;
330  CO_API void unmapObject( Object* object ) override;
331 
333  CO_API void disableInstanceCache();
334 
336  CO_API void expireInstanceData( const int64_t age );
337 
355  CO_API void enableSendOnRegister();
356 
358  CO_API void disableSendOnRegister();
359 
380  CO_API virtual void objectPush( const uint128_t& groupID,
381  const uint128_t& objectType,
382  const uint128_t& objectID,
383  DataIStream& istream );
384 
386  typedef boost::function< void( const uint128_t&,
387  const uint128_t&,
388  const uint128_t&,
400  CO_API void registerPushHandler( const uint128_t& groupID,
401  const PushHandler& handler );
402 
403 
405  typedef boost::function< bool( CustomICommand& ) > CommandHandler;
406 
421  CO_API bool registerCommandHandler( const uint128_t& command,
422  const CommandHandler& func,
423  CommandQueue* queue );
424 
427  CO_API void swapObject( Object* oldObject, Object* newObject );
429 
441  CO_API NodePtr getNode( const NodeID& id ) const;
442 
444  CO_API Nodes getNodes( const bool addSelf = true ) const;
445 
447  CO_API CommandQueue* getCommandThreadQueue();
448 
454  CO_API bool inCommandThread() const;
455 
457  CO_API const Strings& getCommandLine() const;
458 
459  CO_API int64_t getTime64() const;
460  CO_API ssize_t getCounter( const Counter counter ) const;
461 
462 
469  CO_API ConnectionPtr addListener( ConnectionDescriptionPtr desc );
470 
472  CO_API void addListener( ConnectionPtr connection );
473 
475  CO_API void removeListeners( const Connections& connections );
476 
483  CO_API void flushCommands();
484 
486  CO_API BufferPtr allocBuffer( const uint64_t size );
487 
499  CO_API bool dispatchCommand( ICommand& command ) override;
500 
501 
503  typedef lunchbox::RefPtr< co::SendToken > SendToken;
504 
513  CO_API SendToken acquireSendToken( NodePtr toNode );
514 
516  CO_API void releaseSendToken( SendToken token );
517 
519  CO_API Zeroconf getZeroconf();
521 
523  CO_API void ackRequest( NodePtr node, const uint32_t requestID );
524 
526  CO_API void ping( NodePtr remoteNode );
527 
533  CO_API bool pingIdleNodes();
534 
539  CO_API void setAffinity( const int32_t affinity );
540 
542  CO_API void addConnection( ConnectionPtr connection );
543 
544 protected:
546  CO_API ~LocalNode() override;
547 
560  CO_API bool connect( NodePtr node, ConnectionPtr connection );
561 
563  virtual void notifyConnect( NodePtr ) {}
564 
566  virtual void notifyDisconnect( NodePtr ) {}
567 
576  CO_API virtual NodePtr createNode( const uint32_t type );
577 
578 private:
579  detail::LocalNode* const _impl;
580 
581  friend class detail::ReceiverThread;
582  bool _startCommandThread( const int32_t threadID );
583  void _runReceiverThread();
584 
585  friend class detail::CommandThread;
586  bool _notifyCommandThreadIdle();
587 
588  void _cleanup();
589  void _closeNode( NodePtr node );
590  void _addConnection( ConnectionPtr connection );
591  void _removeConnection( ConnectionPtr connection );
592 
593  lunchbox::Request< void > _removeListener( ConnectionPtr connection );
594 
595  uint32_t _connect( NodePtr node );
596  NodePtr _connect( const NodeID& nodeID );
597  uint32_t _connect( NodePtr node, ConnectionPtr connection );
598  NodePtr _connect( const NodeID& nodeID, NodePtr peer );
599  NodePtr _connectFromZeroconf( const NodeID& nodeID );
600  bool _connectSelf();
601 
602  bool _setupPeer( const std::string& setupOpts );
603 
604  void _handleConnect();
605  void _handleDisconnect();
606  bool _handleData();
607  BufferPtr _readHead( ConnectionPtr connection );
608  ICommand _setupCommand( ConnectionPtr, ConstBufferPtr );
609  bool _readTail( ICommand&, BufferPtr, ConnectionPtr );
610  void _initService();
611  void _exitService();
612 
613  friend class ObjectStore;
614  template< typename T >
615  void _registerCommand( const uint32_t command, const CommandFunc< T >& func,
616  CommandQueue* destinationQueue )
617  {
618  registerCommand( command, func, destinationQueue );
619  }
620 
621  void _dispatchCommand( ICommand& command );
622  void _redispatchCommands();
623 
625  bool _cmdAckRequest( ICommand& command );
626  bool _cmdStopRcv( ICommand& command );
627  bool _cmdStopCmd( ICommand& command );
628  bool _cmdSetAffinity( ICommand& command );
629  bool _cmdConnect( ICommand& command );
630  bool _cmdConnectReply( ICommand& command );
631  bool _cmdConnectAck( ICommand& command );
632  bool _cmdID( ICommand& command );
633  bool _cmdDisconnect( ICommand& command );
634  bool _cmdGetNodeData( ICommand& command );
635  bool _cmdGetNodeDataReply( ICommand& command );
636  bool _cmdAcquireSendToken( ICommand& command );
637  bool _cmdAcquireSendTokenReply( ICommand& command );
638  bool _cmdReleaseSendToken( ICommand& command );
639  bool _cmdAddListener( ICommand& command );
640  bool _cmdRemoveListener( ICommand& command );
641  bool _cmdPing( ICommand& command );
642  bool _cmdCommand( ICommand& command );
643  bool _cmdCommandAsync( ICommand& command );
644  bool _cmdAddConnection( ICommand& command );
645  bool _cmdDiscard( ICommand& ) { return true; }
647 
648  LB_TS_VAR( _cmdThread )
649  LB_TS_VAR( _rcvThread )
650 };
651 
652 inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
653 {
654  os << static_cast< const Node& >( node );
655  return os;
656 }
657 }
658 #endif // CO_LOCALNODE_H
Num of mapObjects served for other nodes.
Definition: localNode.h:54
std::vector< ConnectionPtr > Connections
A vector of ConnectionPtr&#39;s.
Definition: types.h:125
lunchbox::RefPtr< co::SendToken > SendToken
A handle for a send token acquired by acquireSendToken().
Definition: localNode.h:503
lunchbox::RefPtr< ConnectionDescription > ConnectionDescriptionPtr
A reference pointer for ConnectionDescription pointers.
Definition: types.h:96
uint128_t NodeID
A unique identifier for nodes.
Definition: types.h:83
A distributed object.
Definition: object.h:45
STL namespace.
A helper struct bundling an object identifier and version.
Definition: objectVersion.h:47
A thread-safe, blocking queue for ICommand buffers.
Definition: commandQueue.h:33
Proxy node representing a remote LocalNode.
Definition: node.h:42
f_bool_t mapObject(Object *object, const ObjectVersion &v)
Convenience wrapper for mapObject().
Definition: localNode.h:281
uint128_t identifier
the object identifier
lunchbox::RefPtr< Node > NodePtr
A reference pointer for Node pointers.
Definition: types.h:86
f_bool_t mapObject(Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST)
Definition: localNode.h:285
Counter
Counters are monotonically increasing performance variables for operations performed by a LocalNode i...
Definition: localNode.h:52
Object-oriented network library.
Definition: barrier.h:27
virtual bool exitLocal()
Close a listening node.
Definition: localNode.h:119
Interface for entities which map and register objects.
Definition: objectHandler.h:29
boost::function< bool(CustomICommand &) > CommandHandler
Function signature for custom command handlers.
Definition: localNode.h:405
A zeroconf communicator.
Definition: zeroconf.h:46
A class managing received commands.
Definition: iCommand.h:43
Node specialization for a local node.
Definition: localNode.h:44
uint128_t version
the object version
A wrapper to register a function callback on an object instance.
Definition: commandFunc.h:38
A std::istream-like input data stream for binary data.
Definition: dataIStream.h:41
boost::function< void(const uint128_t &, const uint128_t &, const uint128_t &, DataIStream &) > PushHandler
Function signature for push handlers.
Definition: localNode.h:389
std::vector< NodePtr > Nodes
A vector of NodePtr&#39;s.
Definition: types.h:104
A plain co::Node.
Definition: nodeType.h:31
lunchbox::RefPtr< Connection > ConnectionPtr
A reference pointer for Connection pointers.
Definition: types.h:94