Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_OBJECTSTORE_H
23 : #define CO_OBJECTSTORE_H
24 :
25 : #include <co/dispatcher.h> // base class
26 : #include <co/version.h> // enum
27 :
28 : #include <lunchbox/lockable.h> // member
29 : #include <lunchbox/spinLock.h> // member
30 :
31 : #include "dataIStreamQueue.h" // member
32 :
33 : namespace co
34 : {
35 : class InstanceCache;
36 :
37 : /** An object store manages Object mapping for a LocalNode. */
38 : class ObjectStore : public Dispatcher
39 : {
40 : public:
41 : /** Construct a new ObjectStore. */
42 : ObjectStore(LocalNode* localNode, a_ssize_t* counters);
43 :
44 : /** Destruct this ObjectStore. */
45 : virtual ~ObjectStore();
46 :
47 : /** Remove all objects and clear all caches. */
48 : void clear();
49 :
50 : /**
51 : * Return the master node id for an identifier.
52 : *
53 : * @param id the identifier.
54 : * @return the master node, or 0 if no master node is
55 : * found for the identifier.
56 : */
57 : NodeID findMasterNodeID(const uint128_t& id);
58 :
59 : /** @name ICommand Dispatch */
60 : //@{
61 : /**
62 : * Dispatches an object command to the registered command queue.
63 : *
64 : * Object commands are dispatched to the appropriate objects mapped on
65 : * this session.
66 : *
67 : * @param command the command.
68 : * @return true if the command was dispatched, false otherwise.
69 : */
70 : bool dispatchObjectCommand(ICommand& command);
71 : //@}
72 :
73 : /** @name Object Registration */
74 : //@{
75 : /**
76 : * Register a distributed object.
77 : *
78 : * Registering a distributed object assigns a session-unique identifier
79 : * to this object, and makes this object the master version. The
80 : * identifier is used to map slave instances of the object. Master
81 : * versions of objects are typically writable and can commit new
82 : * versions of the distributed object.
83 : *
84 : * @param object the object instance.
85 : * @return true if the object was registered, false otherwise.
86 : */
87 : bool register_(Object* object);
88 :
89 : /**
90 : * Deregister a distributed object.
91 : *
92 : * @param object the object instance.
93 : */
94 : void deregister(Object* object);
95 :
96 : /** Start mapping a distributed object. */
97 : uint32_t mapNB(Object* object, const uint128_t& id,
98 : const uint128_t& version, NodePtr master);
99 :
100 : /** Finalize the mapping of a distributed object. */
101 : bool mapSync(const uint32_t requestID);
102 :
103 : /** Synchronize an object. */
104 : f_bool_t sync(Object* object, const uint128_t& id, NodePtr master,
105 : const uint32_t instanceID);
106 : /**
107 : * Unmap a mapped object.
108 : *
109 : * @param object the mapped object.
110 : */
111 : void unmap(Object* object);
112 :
113 : /**
114 : * Attach an object to an identifier.
115 : *
116 : * Attaching an object to an identifier enables it to receive object
117 : * commands though the local node. It does not establish any data
118 : * mapping to other object instances with the same identifier.
119 : *
120 : * @param object the object.
121 : * @param id the object identifier.
122 : * @param instanceID the node-local instance identifier, or
123 : * CO_INSTANCE_INVALID if this method should generate one.
124 : */
125 : void attach(Object* object, const uint128_t& id, const uint32_t instanceID);
126 :
127 : /**
128 : * Detach an object.
129 : *
130 : * @param object the attached object.
131 : */
132 : void detach(Object* object);
133 :
134 : /** @internal swap the existing object by a new object and keep
135 : the cm, id and instanceID. */
136 : void swap(Object* oldObject, Object* newObject);
137 : //@}
138 :
139 : /** @name Instance Cache. */
140 : //@{
141 : /** Expire all data older than age from the cache. */
142 : void expireInstanceData(const int64_t age);
143 :
144 : /** Remove all entries of the node from the cache. */
145 : void removeInstanceData(const NodeID& nodeID);
146 :
147 : /** Disable the instance cache of an stopped local node. */
148 : void disableInstanceCache();
149 :
150 : /** Enable sending data of newly registered objects when idle. */
151 : void enableSendOnRegister();
152 :
153 : /**
154 : * Disable sending data of newly registered objects when idle.
155 : *
156 : * Enable and disable are counted, that is, the last disable on a
157 : * matched series of enable/disable will be effective. When
158 : * send-on-register gets deactivated, the associated queue is cleared
159 : * and all data send on multicast connections is finished.
160 : */
161 : void disableSendOnRegister();
162 :
163 : /**
164 : * @internal
165 : * Notification - no pending commands for the command thread.
166 : * @return true if more work is pending.
167 : */
168 : virtual bool notifyCommandThreadIdle();
169 :
170 : /**
171 : * @internal
172 : * Remove a slave node in all objects
173 : */
174 : void removeNode(NodePtr node);
175 : //@}
176 :
177 : private:
178 : /** The local node managing the object store. */
179 : LocalNode* const _localNode;
180 :
181 : /** The identifiers for node-local instance identifiers. */
182 : lunchbox::a_int32_t _instanceIDs;
183 :
184 : /** enableSendOnRegister() invocations. */
185 : lunchbox::a_int32_t _sendOnRegister;
186 :
187 : typedef std::unordered_map<uint128_t, Objects> ObjectsHash;
188 : typedef ObjectsHash::const_iterator ObjectsHashCIter;
189 :
190 : /** All registered and mapped objects.
191 : * - locked writes (only in receiver thread)
192 : * - unlocked reads in receiver thread
193 : * - locked reads in all other threads
194 : */
195 : lunchbox::Lockable<ObjectsHash, lunchbox::SpinLock> _objects;
196 :
197 : struct SendQueueItem
198 : {
199 0 : SendQueueItem()
200 0 : : age(0)
201 0 : , object(0)
202 : {
203 0 : }
204 : int64_t age;
205 : Object* object;
206 : };
207 :
208 : typedef std::deque<SendQueueItem> SendQueue;
209 : typedef SendQueue::iterator SendQueueIter;
210 :
211 : SendQueue _sendQueue; //!< Object data to broadcast when idle
212 : InstanceCache* _instanceCache; //!< cached object mapping data
213 : DataIStreamQueue _pushData; //!< Object::push() queue
214 : a_ssize_t* const _counters; // LocalNode performance counters
215 :
216 : void _attach(Object* object, const uint128_t& id,
217 : const uint32_t instanceID);
218 : void _detach(Object* object);
219 :
220 : /** Start synchronizing an object. */
221 : uint32_t _startSync(Object* object, const uint128_t& id, NodePtr master,
222 : const uint32_t instanceID);
223 :
224 : /** Finalize the synchronization of a distributed object. */
225 : bool _finishSync(const uint32_t requestID, Object* object);
226 :
227 : bool _checkInstanceCache(const uint128_t& id, uint128_t& from,
228 : uint128_t& to, uint32_t& instanceID);
229 :
230 : /** The command handler functions. */
231 : bool _cmdFindMasterNodeID(ICommand& command);
232 : bool _cmdFindMasterNodeIDReply(ICommand& command);
233 : bool _cmdAttach(ICommand& command);
234 : bool _cmdDetach(ICommand& command);
235 : bool _cmdMap(ICommand& command);
236 : bool _cmdMapSuccess(ICommand& command);
237 : bool _cmdMapReply(ICommand& command);
238 : bool _cmdSync(ICommand& command);
239 : bool _cmdSyncReply(ICommand& command);
240 : bool _cmdUnmap(ICommand& command);
241 : bool _cmdUnsubscribe(ICommand& command);
242 : bool _cmdInstance(ICommand& command);
243 : bool _cmdRegister(ICommand& command);
244 : bool _cmdDeregister(ICommand& command);
245 : bool _cmdDisableSendOnRegister(ICommand& command);
246 : bool _cmdRemoveNode(ICommand& command);
247 : bool _cmdPush(ICommand& command);
248 :
249 104 : LB_TS_VAR(_receiverThread);
250 104 : LB_TS_VAR(_commandThread);
251 : };
252 :
253 : std::ostream& operator<<(std::ostream& os, ObjectStore* objectStore);
254 : }
255 : #endif // CO_OBJECTSTORE_H
|