Collage  1.7.0
High-performance C++ library for developing object-oriented distributed applications.
localNode.h
1 
2 /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3  * Cedric Stalder <cedric.stalder@gmail.com>
4  * Daniel Nachbaur <danielnachbaur@gmail.com>
5  *
6  * This file is part of Collage <https://github.com/Eyescale/Collage>
7  *
8  * This library is free software; you can redistribute it and/or modify it under
9  * the terms of the GNU Lesser General Public License version 2.1 as published
10  * by the Free Software Foundation.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15  * details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21 
22 #ifndef CO_LOCALNODE_H
23 #define CO_LOCALNODE_H
24 
25 #include <co/node.h> // base class
26 #include <co/objectHandler.h> // base class
27 #include <co/objectVersion.h> // VERSION_FOO used inline
28 #include <lunchbox/requestHandler.h> // base class
29 
30 #include <boost/function/function1.hpp>
31 #include <boost/function/function4.hpp>
32 
33 namespace co
34 {
35 namespace detail
36 {
37 class LocalNode;
38 class ReceiverThread;
39 class CommandThread;
40 }
41 
49 class LocalNode : public lunchbox::RequestHandler,
50  public Node,
51  public ObjectHandler
52 {
53 public:
58  enum Counter
59  {
61  COUNTER_ALL // must be last
62  };
63 
65  CO_API explicit LocalNode(const uint32_t type = co::NODETYPE_NODE);
66 
97  CO_API virtual bool initLocal(const int argc, char** argv);
98 
111  CO_API virtual bool listen();
112 
122  CO_API virtual bool close();
123 
125  virtual bool exitLocal() { return close(); }
142  CO_API bool connect(NodePtr node);
143 
156  CO_API NodePtr connect(const NodeID& nodeID);
157 
170  CO_API NodePtr connectObjectMaster(const uint128_t& id);
171 
179  CO_API virtual bool disconnect(NodePtr node);
181 
200  CO_API bool launch(NodePtr node, const std::string& command);
201 
207  CO_API NodePtr syncLaunch(const uint128_t& nodeID, int64_t timeout);
209 
224  CO_API bool registerObject(Object* object) override;
225 
235  CO_API void deregisterObject(Object* object) override;
236 
281  CO_API f_bool_t mapObject(Object* object, const uint128_t& id,
282  NodePtr master,
283  const uint128_t& version = VERSION_OLDEST);
284 
286  f_bool_t mapObject(Object* object, const ObjectVersion& v)
287  {
288  return mapObject(object, v.identifier, 0, v.version);
289  }
290 
292  f_bool_t mapObject(Object* object, const uint128_t& id,
293  const uint128_t& version = VERSION_OLDEST)
294  {
295  return mapObject(object, id, 0, version);
296  }
297 
299  CO_API uint32_t mapObjectNB(Object* object, const uint128_t& id,
300  const uint128_t& version = VERSION_OLDEST);
301 
303  CO_API uint32_t mapObjectNB(Object* object, const uint128_t& id,
304  const uint128_t& version,
305  NodePtr master) override;
306 
308  CO_API bool mapObjectSync(const uint32_t requestID) override;
309 
330  CO_API f_bool_t
331  syncObject(Object* object, const uint128_t& id, NodePtr master,
332  const uint32_t instanceID = CO_INSTANCE_ALL) override;
339  CO_API void unmapObject(Object* object) override;
340 
342  CO_API void disableInstanceCache();
343 
345  CO_API void expireInstanceData(const int64_t age);
346 
364  CO_API void enableSendOnRegister();
365 
367  CO_API void disableSendOnRegister();
368 
389  CO_API virtual void objectPush(const uint128_t& groupID,
390  const uint128_t& objectType,
391  const uint128_t& objectID,
392  DataIStream& istream);
393 
395  typedef boost::function<void(const uint128_t&,
396  const uint128_t&,
397  const uint128_t&,
398  DataIStream&)>
410  CO_API void registerPushHandler(const uint128_t& groupID,
411  const PushHandler& handler);
412 
414  typedef boost::function<bool(CustomICommand&)> CommandHandler;
415 
430  CO_API bool registerCommandHandler(const uint128_t& command,
431  const CommandHandler& func,
432  CommandQueue* queue);
433 
436  CO_API void swapObject(Object* oldObject, Object* newObject);
438 
450  CO_API NodePtr getNode(const NodeID& id) const;
451 
453  CO_API Nodes getNodes(const bool addSelf = true) const;
454 
456  CO_API CommandQueue* getCommandThreadQueue();
457 
463  CO_API bool inCommandThread() const;
464 
466  CO_API const Strings& getCommandLine() const;
467 
468  CO_API int64_t getTime64() const;
469  CO_API ssize_t getCounter(const Counter counter) const;
470 
471 
478  CO_API ConnectionPtr addListener(ConnectionDescriptionPtr desc);
479 
481  CO_API void addListener(ConnectionPtr connection);
482 
484  CO_API void removeListeners(const Connections& connections);
485 
492  CO_API void flushCommands();
493 
495  CO_API BufferPtr allocBuffer(const uint64_t size);
496 
508  CO_API bool dispatchCommand(ICommand& command) override;
509 
511  typedef lunchbox::RefPtr<co::SendToken> SendToken;
512 
521  CO_API SendToken acquireSendToken(NodePtr toNode);
522 
524  CO_API void releaseSendToken(SendToken token);
525 
527  CO_API Zeroconf getZeroconf();
529 
531  CO_API void ackRequest(NodePtr node, const uint32_t requestID);
532 
534  CO_API void ping(NodePtr remoteNode);
535 
541  CO_API bool pingIdleNodes();
542 
547  CO_API void setAffinity(const int32_t affinity);
548 
550  CO_API void addConnection(ConnectionPtr connection);
551 
552 protected:
554  CO_API ~LocalNode() override;
555 
568  CO_API bool connect(NodePtr node, ConnectionPtr connection);
569 
571  virtual void notifyConnect(NodePtr) {}
573  virtual void notifyDisconnect(NodePtr) {}
582  CO_API virtual NodePtr createNode(const uint32_t type);
583 
584 private:
585  detail::LocalNode* const _impl;
586 
587  friend class detail::ReceiverThread;
588  bool _startCommandThread(const int32_t threadID);
589  void _runReceiverThread();
590 
591  friend class detail::CommandThread;
592  bool _notifyCommandThreadIdle();
593 
594  void _cleanup();
595  void _closeNode(NodePtr node);
596  void _addConnection(ConnectionPtr connection);
597  void _removeConnection(ConnectionPtr connection);
598 
599  lunchbox::Request<void> _removeListener(ConnectionPtr connection);
600 
601  uint32_t _connect(NodePtr node);
602  NodePtr _connect(const NodeID& nodeID);
603  uint32_t _connect(NodePtr node, ConnectionPtr connection);
604  NodePtr _connect(const NodeID& nodeID, NodePtr peer);
605  NodePtr _connectFromZeroconf(const NodeID& nodeID);
606  bool _connectSelf();
607 
608  bool _setupPeer(const std::string& setupOpts);
609 
610  void _handleConnect();
611  void _handleDisconnect();
612  bool _handleData();
613  BufferPtr _readHead(ConnectionPtr connection);
614  ICommand _setupCommand(ConnectionPtr, ConstBufferPtr);
615  bool _readTail(ICommand&, BufferPtr, ConnectionPtr);
616  void _initService();
617  void _exitService();
618 
619  friend class ObjectStore;
620  template <typename T>
621  void _registerCommand(const uint32_t command, const CommandFunc<T>& func,
622  CommandQueue* destinationQueue)
623  {
624  registerCommand(command, func, destinationQueue);
625  }
626 
627  void _dispatchCommand(ICommand& command);
628  void _redispatchCommands();
629 
631  bool _cmdAckRequest(ICommand& command);
632  bool _cmdStopRcv(ICommand& command);
633  bool _cmdStopCmd(ICommand& command);
634  bool _cmdSetAffinity(ICommand& command);
635  bool _cmdConnect(ICommand& command);
636  bool _cmdConnectReply(ICommand& command);
637  bool _cmdConnectAck(ICommand& command);
638  bool _cmdID(ICommand& command);
639  bool _cmdDisconnect(ICommand& command);
640  bool _cmdGetNodeData(ICommand& command);
641  bool _cmdGetNodeDataReply(ICommand& command);
642  bool _cmdAcquireSendToken(ICommand& command);
643  bool _cmdAcquireSendTokenReply(ICommand& command);
644  bool _cmdReleaseSendToken(ICommand& command);
645  bool _cmdAddListener(ICommand& command);
646  bool _cmdRemoveListener(ICommand& command);
647  bool _cmdPing(ICommand& command);
648  bool _cmdCommand(ICommand& command);
649  bool _cmdCommandAsync(ICommand& command);
650  bool _cmdAddConnection(ICommand& command);
651  bool _cmdDiscard(ICommand&) { return true; }
653 
654  LB_TS_VAR(_cmdThread)
655  LB_TS_VAR(_rcvThread)
656 };
657 
658 inline std::ostream& operator<<(std::ostream& os, const LocalNode& node)
659 {
660  os << static_cast<const Node&>(node);
661  return os;
662 }
663 }
664 #endif // CO_LOCALNODE_H
boost::function< void(const uint128_t &, const uint128_t &, const uint128_t &, DataIStream &)> PushHandler
Function signature for push handlers.
Definition: localNode.h:399
Num of mapObjects served for other nodes.
Definition: localNode.h:60
uint128_t NodeID
A unique identifier for nodes.
Definition: types.h:82
A distributed object.
Definition: object.h:47
STL namespace.
boost::function< bool(CustomICommand &)> CommandHandler
Function signature for custom command handlers.
Definition: localNode.h:414
A helper struct bundling an object identifier and version.
Definition: objectVersion.h:46
A thread-safe, blocking queue for ICommand buffers.
Definition: commandQueue.h:36
Proxy node representing a remote LocalNode.
Definition: node.h:45
f_bool_t mapObject(Object *object, const ObjectVersion &v)
Convenience wrapper for mapObject().
Definition: localNode.h:286
uint128_t identifier
the object identifier
f_bool_t mapObject(Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST)
Definition: localNode.h:292
Counter
Counters are monotonically increasing performance variables for operations performed by a LocalNode i...
Definition: localNode.h:58
Object-oriented network library.
Definition: barrier.h:27
virtual bool exitLocal()
Close a listening node.
Definition: localNode.h:125
Interface for entities which map and register objects.
Definition: objectHandler.h:29
std::vector< NodePtr > Nodes
A vector of NodePtr&#39;s.
Definition: types.h:103
A zeroconf communicator.
Definition: zeroconf.h:49
A class managing received commands.
Definition: iCommand.h:45
Node specialization for a local node.
Definition: localNode.h:49
uint128_t version
the object version
A wrapper to register a function callback on an object instance.
Definition: commandFunc.h:39
A std::istream-like input data stream for binary data.
Definition: dataIStream.h:45
lunchbox::RefPtr< ConnectionDescription > ConnectionDescriptionPtr
A reference pointer for ConnectionDescription pointers.
Definition: types.h:95
lunchbox::RefPtr< Connection > ConnectionPtr
A reference pointer for Connection pointers.
Definition: types.h:93
std::vector< ConnectionPtr > Connections
A vector of ConnectionPtr&#39;s.
Definition: types.h:124
lunchbox::RefPtr< co::SendToken > SendToken
A handle for a send token acquired by acquireSendToken().
Definition: localNode.h:511
A plain co::Node.
Definition: nodeType.h:31
lunchbox::RefPtr< Node > NodePtr
A reference pointer for Node pointers.
Definition: types.h:85