Collage
1.4.0
High-performance C++ library for developing object-oriented distributed applications.
|
Node specialization for a local node. More...
#include <localNode.h>
Public Types | |
enum | Counter { COUNTER_MAP_OBJECT_REMOTE, COUNTER_ALL } |
Counters are monotonically increasing performance variables for operations performed by a LocalNode instance. More... | |
Public Types inherited from co::Dispatcher | |
typedef CommandFunc< Dispatcher > | Func |
The signature of the base Dispatcher callback. More... | |
Public Member Functions | |
CO_API | LocalNode (const uint32_t type=co::NODETYPE_NODE) |
Construct a new local node of the given type. More... | |
CO_API void | ackRequest (NodePtr node, const uint32_t requestID) |
CO_API void | ping (NodePtr remoteNode) |
Request keep-alive update from the remote node. More... | |
CO_API bool | pingIdleNodes () |
Request updates from all nodes above keep-alive timeout. More... | |
CO_API void | setAffinity (const int32_t affinity) |
Bind this, the receiver and the command thread to the given lunchbox::Thread affinity. | |
CO_API void | addConnection (ConnectionPtr connection) |
State Changes | |
The following methods affect the state of the node by changing its connectivity to the network. | |
virtual CO_API bool | initLocal (const int argc, char **argv) |
Initialize the node. More... | |
virtual CO_API bool | listen () |
Open all connections and put this node into the listening state. More... | |
virtual CO_API bool | close () |
Close a listening node. More... | |
virtual bool | exitLocal () |
Close a listening node. More... | |
CO_API bool | connect (NodePtr node) |
Connect a remote node (proxy) to this listening node. More... | |
CO_API NodePtr | connect (const NodeID &nodeID) |
Create and connect a node given by an identifier. More... | |
CO_API NodePtr | connectObjectMaster (const uint128_t &id) |
Find and connect the node where the given object is registered. More... | |
virtual CO_API bool | disconnect (NodePtr node) |
Disconnect a connected node. More... | |
Data Access | |
CO_API NodePtr | getNode (const NodeID &id) const |
Get a node by identifier. More... | |
CO_API void | getNodes (Nodes &nodes, const bool addSelf=true) const |
Assemble a vector of the currently connected nodes. More... | |
CO_API CommandQueue * | getCommandThreadQueue () |
Return the command queue to the command thread. More... | |
CO_API bool | inCommandThread () const |
CO_API int64_t | getTime64 () const |
CO_API ssize_t | getCounter (const Counter counter) const |
Public Member Functions inherited from co::Node | |
CO_API | Node (const uint32_t type=co::NODETYPE_NODE) |
Construct a new node proxy. More... | |
CO_API int64_t | getLastReceiveTime () const |
CO_API std::string | serialize () const |
CO_API bool | deserialize (std::string &data) |
CO_API const NodeID & | getNodeID () const |
Get the node's unique identifier. More... | |
CO_API uint32_t | getType () const |
bool | operator== (const Node *n) const |
bool | isBigEndian () const |
CO_API bool | isReachable () const |
CO_API bool | isConnected () const |
CO_API bool | isListening () const |
CO_API bool | isClosed () const |
CO_API bool | isClosing () const |
bool | isLocal () const |
CO_API void | addConnectionDescription (ConnectionDescriptionPtr cd) |
Add a new description how this node can be reached. More... | |
CO_API bool | removeConnectionDescription (ConnectionDescriptionPtr cd) |
Removes a connection description. More... | |
CO_API ConnectionDescriptions | getConnectionDescriptions () const |
CO_API ConnectionPtr | getConnection (const bool multicast=false) |
Get an active connection to this node. More... | |
CO_API OCommand | send (const uint32_t cmd, const bool multicast=false) |
Send a command with optional data to the node. More... | |
CO_API CustomOCommand | send (const uint128_t &commandID, const bool multicast=false) |
Send a custom command with optional data to the node. More... | |
Public Member Functions inherited from co::Dispatcher | |
Dispatcher & | operator= (const Dispatcher &) |
template<typename T > | |
void | registerCommand (const uint32_t command, const CommandFunc< T > &func, CommandQueue *queue) |
Register a command member function for a command. More... | |
Public Member Functions inherited from co::ObjectHandler | |
CO_API void | releaseObject (Object *object) |
Convenience method to deregister or unmap an object. More... | |
Protected Member Functions | |
CO_API | ~LocalNode () override |
Destruct this local node. More... | |
CO_API bool | connect (NodePtr node, ConnectionPtr connection) |
virtual void | notifyConnect (NodePtr) |
virtual void | notifyDisconnect (NodePtr) |
virtual CO_API NodePtr | createNode (const uint32_t type) |
Factory method to create a new node. More... | |
Protected Member Functions inherited from co::Node | |
virtual CO_API | ~Node () |
Destruct this node. More... | |
void | _addConnectionDescription (ConnectionDescriptionPtr cd) |
bool | _removeConnectionDescription (ConnectionDescriptionPtr cd) |
ConnectionPtr | _getMulticast () const |
ConnectionPtr | getMulticast () |
Activate and return a multicast connection. More... | |
Protected Member Functions inherited from co::Dispatcher | |
CO_API | Dispatcher (const Dispatcher &from) |
CO_API bool | _cmdUnknown (ICommand &command) |
The default handler for handling commands. More... | |
Protected Member Functions inherited from co::ObjectHandler | |
ObjectHandler () | |
Construct a new object handler. More... | |
virtual | ~ObjectHandler () |
Destroy this object handler. More... | |
Friends | |
class | detail::ReceiverThread |
class | detail::CommandThread |
class | ObjectStore |
Object Registry | |
typedef boost::function< void(const uint128_t &, const uint128_t &, const uint128_t &, DataIStream &) > | PushHandler |
Function signature for push handlers. More... | |
typedef boost::function< bool(CustomICommand &) > | CommandHandler |
Function signature for custom command handlers. More... | |
CO_API bool | registerObject (Object *object) override |
Register a distributed object. More... | |
CO_API void | deregisterObject (Object *object) override |
Deregister a distributed object. More... | |
CO_API f_bool_t | mapObject (Object *object, const uint128_t &id, NodePtr master, const uint128_t &version=VERSION_OLDEST) |
Map a distributed object. More... | |
f_bool_t | mapObject (Object *object, const ObjectVersion &v) |
Convenience wrapper for mapObject(). More... | |
f_bool_t | mapObject (Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST) |
CO_API uint32_t | mapObjectNB (Object *object, const uint128_t &id, const uint128_t &version=VERSION_OLDEST) |
CO_API uint32_t | mapObjectNB (Object *object, const uint128_t &id, const uint128_t &version, NodePtr master) override |
CO_API bool | mapObjectSync (const uint32_t requestID) override |
CO_API f_bool_t | syncObject (Object *object, NodePtr master, const uint128_t &id, const uint32_t instanceID=CO_INSTANCE_ALL) override |
Synchronize the local object with a remote object. More... | |
CO_API void | unmapObject (Object *object) override |
Unmap a mapped object. More... | |
CO_API void | disableInstanceCache () |
Disable the instance cache of a stopped local node. More... | |
CO_API void | expireInstanceData (const int64_t age) |
CO_API void | enableSendOnRegister () |
Enable sending instance data after registration. More... | |
CO_API void | disableSendOnRegister () |
Disable sending data of newly registered objects. More... | |
virtual CO_API void | objectPush (const uint128_t &groupID, const uint128_t &objectType, const uint128_t &objectID, DataIStream &istream) |
Handler for an Object::push() operation. More... | |
CO_API void | registerPushHandler (const uint128_t &groupID, const PushHandler &handler) |
Register a custom handler for Object::push operations. More... | |
CO_API bool | registerCommandHandler (const uint128_t &command, const CommandHandler &func, CommandQueue *queue) |
Register a custom command handler handled by this node. More... | |
CO_API void | swapObject (Object *oldObject, Object *newObject) |
Operations | |
typedef lunchbox::RefPtr< co::SendToken > | SendToken |
A handle for a send token acquired by acquireSendToken(). More... | |
CO_API ConnectionPtr | addListener (ConnectionDescriptionPtr desc) |
Add a listening connection to this listening node. More... | |
CO_API void | addListener (ConnectionPtr connection) |
Add a listening connection to this listening node. More... | |
CO_API void | removeListeners (const Connections &connections) |
Remove listening connections from this listening node. More... | |
CO_API void | flushCommands () |
CO_API BufferPtr | allocBuffer (const uint64_t size) |
CO_API bool | dispatchCommand (ICommand &command) override |
Dispatches a command to the registered command queue. More... | |
CO_API SendToken | acquireSendToken (NodePtr toNode) |
Acquire a send token from the given node. More... | |
CO_API void | releaseSendToken (SendToken token) |
CO_API Zeroconf | getZeroconf () |
Node specialization for a local node.
Local nodes listen on network connections, manage connections to other nodes and provide Object registration, mapping and command dispatch. Typically each process uses one local node to communicate with other processes.
Definition at line 44 of file localNode.h.
typedef boost::function< bool( CustomICommand& ) > co::LocalNode::CommandHandler |
Function signature for custom command handlers.
Definition at line 377 of file localNode.h.
typedef boost::function< void( const uint128_t&, const uint128_t&, const uint128_t&, DataIStream& ) > co::LocalNode::PushHandler |
typedef lunchbox::RefPtr< co::SendToken > co::LocalNode::SendToken |
A handle for a send token acquired by acquireSendToken().
Definition at line 472 of file localNode.h.
Counters are monotonically increasing performance variables for operations performed by a LocalNode instance.
Enumerator | |
---|---|
COUNTER_MAP_OBJECT_REMOTE |
Num of mapObjects served for other nodes. |
Definition at line 52 of file localNode.h.
|
explicit |
Construct a new local node of the given type.
|
overrideprotected |
Destruct this local node.
Acquire a send token from the given node.
The token is released automatically when it leaves its scope or explicitly using releaseSendToken().
CO_API ConnectionPtr co::LocalNode::addListener | ( | ConnectionDescriptionPtr | desc | ) |
Add a listening connection to this listening node.
CO_API void co::LocalNode::addListener | ( | ConnectionPtr | connection | ) |
Add a listening connection to this listening node.
|
virtual |
Close a listening node.
Disconnects all connected node proxies, closes the listening connections and terminates all threads created in listen().
Referenced by exitLocal().
CO_API bool co::LocalNode::connect | ( | NodePtr | node | ) |
Connect a remote node (proxy) to this listening node.
The connection descriptions of the node are used to connect the remote local node. On success, the node is in the connected state, otherwise its state is unchanged.
This method is one-sided, that is, the node to be connected should not initiate a connection to this node at the same time. For concurrent connects use the other connect() method using node identifiers.
node | the remote node. |
Create and connect a node given by an identifier.
This method is two-sided and thread-safe, that is, it can be called by multiple threads on the same node with the same nodeID, or concurrently on two nodes with each others' nodeID.
nodeID | the identifier of the node to connect. |
CO_API NodePtr co::LocalNode::connectObjectMaster | ( | const uint128_t & | id | ) |
Find and connect the node where the given object is registered.
This method is relatively expensive, since potentially all connected nodes are queried.
id | the identifier of the object to search for. |
|
protectedvirtual |
Factory method to create a new node.
type | the type the node type |
|
overridevirtual |
Deregister a distributed object.
All slave instances should be unmapped before this call, and will be forcefully unmapped by this method.
object | the object instance. |
Implements co::ObjectHandler.
CO_API void co::LocalNode::disableInstanceCache | ( | ) |
Disable the instance cache of a stopped local node.
CO_API void co::LocalNode::disableSendOnRegister | ( | ) |
Disable sending data of newly registered objects.
|
virtual |
Disconnect a connected node.
node | the remote node. |
|
overridevirtual |
Dispatches a command to the registered command queue.
Applications using custom command types have to override this method to dispatch the custom commands.
command | the command. |
Reimplemented from co::Dispatcher.
CO_API void co::LocalNode::enableSendOnRegister | ( | ) |
Enable sending instance data after registration.
Send-on-register starts transmitting instance data of registered objects directly after they have been registered. The data is cached on remote nodes and accelerates object mapping. Send-on-register should not be active when remote nodes are joining a multicast group of this node, since they will potentially read out-of-order data streams on the multicast connection.
Enable and disable are counted, that is, the last enable on a matched series of disable/enable will be effective. The disable is completely synchronous, that is, no more instance data will be sent after the first disable.
|
inlinevirtual |
Close a listening node.
Definition at line 119 of file localNode.h.
References close().
CO_API CommandQueue* co::LocalNode::getCommandThreadQueue | ( | ) |
Return the command queue to the command thread.
Get a node by identifier.
The node might not be connected. Thread safe.
id | the node identifier. |
CO_API void co::LocalNode::getNodes | ( | Nodes & | nodes, |
const bool | addSelf = true |
||
) | const |
Assemble a vector of the currently connected nodes.
CO_API Zeroconf co::LocalNode::getZeroconf | ( | ) |
CO_API bool co::LocalNode::inCommandThread | ( | ) | const |
|
virtual |
Initialize the node.
Parses the following command line options and calls listen() afterwards:
The '–co-listen <connection description>' command line option is parsed by this method to add listening connections to this node. This parameter might be used multiple times. ConnectionDescription::fromString() is used to parse the provided description.
The '–co-globals <string>' option is used to initialize the Globals. The string is parsed used Globals::fromString().
Please note that further command line parameters are recognized by co::init().
argc | the command line argument count. |
argv | the command line argument values. |
|
virtual |
Open all connections and put this node into the listening state.
The node will spawn a receiver and command thread, and listen on all connections for incoming commands. The node will be in the listening state if the method completed successfully. A listening node can connect other nodes.
CO_API f_bool_t co::LocalNode::mapObject | ( | Object * | object, |
const uint128_t & | id, | ||
NodePtr | master, | ||
const uint128_t & | version = VERSION_OLDEST |
||
) |
Map a distributed object.
The mapped object becomes a slave instance of the master version which was registered with the provided identifier. The given version can be used to map a specific version.
If VERSION_NONE is provided, the slave instance is not initialized with any data from the master. This is useful if the object has been pre-initialized by other means, for example from a shared file system.
If VERSION_OLDEST is provided, the oldest available version is mapped.
If a concrete requested version no longer exists, mapObject() will map the oldest available version.
If the requested version is newer than the head version, mapObject() will block until the requested version is available.
Mapping an object is a potentially time-consuming operation. Using mapObjectNB() and mapObjectSync() to asynchronously map multiple objects in parallel improves performance of this operation.
After mapping, the object will have the version used during initialization, or VERSION_NONE if mapped to this version.
When no master node is given, connectObjectMaster() is used to find the node with the master instance.
This method returns immediately after initiating the mapping. Evaluating the value of the returned lunchbox::Future will block on the completion of the operation and return true if the object was mapped, false if the master of the object is not found or the requested version is no longer available.
object | the object. |
id | the master object identifier. |
master | the node with the master instance, may be 0. |
version | the initial version. |
Referenced by mapObject().
|
inline |
Convenience wrapper for mapObject().
Definition at line 253 of file localNode.h.
References co::ObjectVersion::identifier, mapObject(), and co::ObjectVersion::version.
|
inline |
Definition at line 257 of file localNode.h.
References mapObject().
CO_API uint32_t co::LocalNode::mapObjectNB | ( | Object * | object, |
const uint128_t & | id, | ||
const uint128_t & | version = VERSION_OLDEST |
||
) |
|
overridevirtual |
Implements co::ObjectHandler.
|
overridevirtual |
Implements co::ObjectHandler.
|
virtual |
Handler for an Object::push() operation.
Called at least on each node listed in an Object::push() operation upon reception of the pushed data from the command thread. Called on all nodes of a multicast group, even for nodes not listed in the Object::push().
The default implementation calls registered push handlers. Typically used to create an object on a remote node, using the objectType for instantiation, the istream to initialize it, and the objectID to map it using VERSION_NONE. The groupID may be used to differentiate multiple concurrent push operations.
groupID | The group identifier given to Object::push() |
objectType | The type identifier given to Object::push() |
objectID | The identifier of the pushed object |
istream | the input data stream containing the instance data. |
CO_API void co::LocalNode::ping | ( | NodePtr | remoteNode | ) |
Request keep-alive update from the remote node.
CO_API bool co::LocalNode::pingIdleNodes | ( | ) |
Request updates from all nodes above keep-alive timeout.
CO_API bool co::LocalNode::registerCommandHandler | ( | const uint128_t & | command, |
const CommandHandler & | func, | ||
CommandQueue * | queue | ||
) |
Register a custom command handler handled by this node.
Custom command handlers are invoked on reception of a CustomICommand send by Node::send( uint128_t, ... ). The command identifier needs to be unique. It is recommended to use servus::make_uint128() to generate this identifier.
command | the unique identifier of the custom command |
func | the handler function for the custom command |
queue | the queue where the command should be inserted to |
|
overridevirtual |
Register a distributed object.
Registering a distributed object makes this object the master version. The object's identifier is used to map slave instances of the object. Master versions of objects are typically writable and can commit new versions of the distributed object.
object | the object instance. |
Implements co::ObjectHandler.
CO_API void co::LocalNode::registerPushHandler | ( | const uint128_t & | groupID, |
const PushHandler & | handler | ||
) |
Register a custom handler for Object::push operations.
The registered handler function will be called automatically for an incoming object push. Threadsafe with itself and objectPush().
groupID | The group identifier given to Object::push() |
handler | The handler function called for a registered groupID |
CO_API void co::LocalNode::releaseSendToken | ( | SendToken | token | ) |
CO_API void co::LocalNode::removeListeners | ( | const Connections & | connections | ) |
Remove listening connections from this listening node.
|
overridevirtual |
Synchronize the local object with a remote object.
The object is synchronized to the newest version of the first attached object on the given master node matching the instanceID. When no master node is given, connectObjectMaster() is used to find the node with the master instance. When CO_INSTANCE_ALL is given, the first instance is used. Before a successful return, applyInstanceData() is called on the calling thread to synchronize the given object.
object | The local object instance to synchronize. |
master | The node where the synchronizing object is attached. |
id | the object identifier. |
instanceID | the instance identifier of the synchronizing object. |
Implements co::ObjectHandler.
|
overridevirtual |