22 #ifndef CO_LOCALNODE_H
23 #define CO_LOCALNODE_H
26 #include <co/objectHandler.h>
27 #include <co/objectVersion.h>
28 #include <lunchbox/requestHandler.h>
30 #include <boost/function/function1.hpp>
31 #include <boost/function/function4.hpp>
35 namespace detail {
class LocalNode;
class ReceiverThread;
class CommandThread; }
91 CO_API
virtual bool initLocal(
const int argc,
char** argv );
105 CO_API
virtual bool listen();
116 CO_API
virtual bool close();
195 CO_API
bool launch(
NodePtr node,
const std::string& command );
278 const uint128_t& version = VERSION_OLDEST );
286 const uint128_t& version = VERSION_OLDEST )
287 {
return mapObject(
object,
id, 0, version ); }
291 const uint128_t& version = VERSION_OLDEST );
295 const uint128_t& version,
299 CO_API
bool mapObjectSync(
const uint32_t requestID )
override;
323 const uint32_t instanceID = CO_INSTANCE_ALL )
override;
336 CO_API
void expireInstanceData(
const int64_t age );
380 CO_API
virtual void objectPush(
const uint128_t& groupID,
381 const uint128_t& objectType,
382 const uint128_t& objectID,
386 typedef boost::function< void(
const uint128_t&,
401 const PushHandler& handler );
422 const CommandHandler& func,
427 CO_API
void swapObject(
Object* oldObject,
Object* newObject );
456 CO_API int64_t getTime64()
const;
457 CO_API ssize_t getCounter(
const Counter counter )
const;
480 CO_API
void flushCommands();
483 CO_API BufferPtr allocBuffer(
const uint64_t size );
520 CO_API
void ackRequest(
NodePtr node,
const uint32_t requestID );
560 virtual void notifyConnect(
NodePtr ) {}
563 virtual void notifyDisconnect(
NodePtr ) {}
576 detail::LocalNode*
const _impl;
578 friend class detail::ReceiverThread;
579 bool _startCommandThread(
const int32_t threadID );
580 void _runReceiverThread();
582 friend class detail::CommandThread;
583 bool _notifyCommandThreadIdle();
586 void _closeNode(
NodePtr node );
590 lunchbox::Request< void > _removeListener(
ConnectionPtr connection );
592 uint32_t _connect(
NodePtr node );
599 bool _setupPeer(
const std::string& setupOpts );
601 void _handleConnect();
602 void _handleDisconnect();
610 friend class ObjectStore;
611 template<
typename T >
612 void _registerCommand(
const uint32_t command,
const CommandFunc< T >& func,
613 CommandQueue* destinationQueue )
618 void _dispatchCommand( ICommand& command );
619 void _redispatchCommands();
622 bool _cmdAckRequest( ICommand& command );
623 bool _cmdStopRcv( ICommand& command );
624 bool _cmdStopCmd( ICommand& command );
625 bool _cmdSetAffinity( ICommand& command );
626 bool _cmdConnect( ICommand& command );
627 bool _cmdConnectReply( ICommand& command );
628 bool _cmdConnectAck( ICommand& command );
629 bool _cmdID( ICommand& command );
630 bool _cmdDisconnect( ICommand& command );
631 bool _cmdGetNodeData( ICommand& command );
632 bool _cmdGetNodeDataReply( ICommand& command );
633 bool _cmdAcquireSendToken( ICommand& command );
634 bool _cmdAcquireSendTokenReply( ICommand& command );
635 bool _cmdReleaseSendToken( ICommand& command );
636 bool _cmdAddListener( ICommand& command );
637 bool _cmdRemoveListener( ICommand& command );
638 bool _cmdPing( ICommand& command );
639 bool _cmdCommand( ICommand& command );
640 bool _cmdCommandAsync( ICommand& command );
641 bool _cmdAddConnection( ICommand& command );
642 bool _cmdDiscard( ICommand& ) {
return true; }
645 LB_TS_VAR( _cmdThread )
646 LB_TS_VAR( _rcvThread )
649 inline
std::ostream& operator << (
std::ostream& os, const
LocalNode& node )
651 os << static_cast< const Node& >( node );
655 #endif // CO_LOCALNODE_H
CO_API void deregisterObject(Object *object) override
Deregister a distributed object.
Num of mapObjects served for other nodes.
std::vector< ConnectionPtr > Connections
A vector of ConnectionPtr's.
CO_API bool connect(NodePtr node)
Connect a remote node (proxy) to this listening node.
CO_API Zeroconf getZeroconf()
lunchbox::RefPtr< co::SendToken > SendToken
A handle for a send token acquired by acquireSendToken().
lunchbox::RefPtr< ConnectionDescription > ConnectionDescriptionPtr
A reference pointer for ConnectionDescription pointers.
CO_API void unmapObject(Object *object) override
Unmap a mapped object.
uint128_t NodeID
A unique identifier for nodes.
virtual CO_API void objectPush(const uint128_t &groupID, const uint128_t &objectType, const uint128_t &objectID, DataIStream &istream)
Handler for an Object::push() operation.
virtual CO_API bool initLocal(const int argc, char **argv)
Initialize the node.
CO_API NodePtr syncLaunch(const uint128_t &nodeID, int64_t timeout)
Wait for a launched node to connect.
A helper struct bundling an object identifier and version.
CO_API void removeListeners(const Connections &connections)
Remove listening connections from this listening node.
A thread-safe, blocking queue for ICommand buffers.
CO_API bool dispatchCommand(ICommand &command) override
Dispatches a command to the registered command queue.
void registerCommand(const uint32_t command, const CommandFunc< T > &func, CommandQueue *queue)
Register a command member function for a command.
virtual CO_API NodePtr createNode(const uint32_t type)
Factory method to create a new node.
Proxy node representing a remote LocalNode.
f_bool_t mapObject(Object *object, const ObjectVersion &v)
Convenience wrapper for mapObject().
CO_API Nodes getNodes(const bool addSelf=true) const
CO_API void registerPushHandler(const uint128_t &groupID, const PushHandler &handler)
Register a custom handler for Object::push operations.
CO_API CommandQueue * getCommandThreadQueue()
Return the command queue to the command thread.
uint128_t identifier
the object identifier
CO_API void enableSendOnRegister()
Enable sending instance data after registration.
lunchbox::RefPtr< Node > NodePtr
A reference pointer for Node pointers.
f_bool_t mapObject(Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST)
Counter
Counters are monotonically increasing performance variables for operations performed by a LocalNode i...
Object-oriented network library.
virtual bool exitLocal()
Close a listening node.
CO_API LocalNode(const uint32_t type=co::NODETYPE_NODE)
Construct a new local node of the given type.
Interface for entities which map and register objects.
CO_API bool registerObject(Object *object) override
Register a distributed object.
boost::function< bool(CustomICommand &) > CommandHandler
Function signature for custom command handlers.
CO_API SendToken acquireSendToken(NodePtr toNode)
Acquire a send token from the given node.
virtual CO_API bool disconnect(NodePtr node)
Disconnect a connected node.
virtual CO_API bool close()
Close a listening node.
CO_API bool inCommandThread() const
A class managing received commands.
Node specialization for a local node.
CO_API bool launch(NodePtr node, const std::string &command)
Launch a remote process using the given command.
CO_API ~LocalNode() override
Destruct this local node.
uint128_t version
the object version
CO_API f_bool_t syncObject(Object *object, const uint128_t &id, NodePtr master, const uint32_t instanceID=CO_INSTANCE_ALL) override
Synchronize the local object with a remote object.
CO_API NodePtr connectObjectMaster(const uint128_t &id)
Find and connect the node where the given object is registered.
A std::istream-like input data stream for binary data.
CO_API void disableInstanceCache()
Disable the instance cache of a stopped local node.
CO_API ConnectionPtr addListener(ConnectionDescriptionPtr desc)
Add a listening connection to this listening node.
CO_API void disableSendOnRegister()
Disable sending data of newly registered objects.
CO_API bool registerCommandHandler(const uint128_t &command, const CommandHandler &func, CommandQueue *queue)
Register a custom command handler handled by this node.
boost::function< void(const uint128_t &, const uint128_t &, const uint128_t &, DataIStream &) > PushHandler
Function signature for push handlers.
CO_API void setAffinity(const int32_t affinity)
Bind this, the receiver and the command thread to the given lunchbox::Thread affinity.
CO_API bool pingIdleNodes()
Request updates from all nodes above keep-alive timeout.
std::vector< NodePtr > Nodes
A vector of NodePtr's.
CO_API void releaseSendToken(SendToken token)
CO_API NodePtr getNode(const NodeID &id) const
Get a node by identifier.
virtual CO_API bool listen()
Open all connections and put this node into the listening state.
CO_API void ping(NodePtr remoteNode)
Request keep-alive update from the remote node.
CO_API f_bool_t mapObject(Object *object, const uint128_t &id, NodePtr master, const uint128_t &version=VERSION_OLDEST)
Map a distributed object.
CO_API bool mapObjectSync(const uint32_t requestID) override
lunchbox::RefPtr< Connection > ConnectionPtr
A reference pointer for Connection pointers.
CO_API uint32_t mapObjectNB(Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST)