Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_OBJECTSTORE_H
23 : #define CO_OBJECTSTORE_H
24 :
25 : #include <co/dispatcher.h> // base class
26 : #include <co/version.h> // enum
27 :
28 : #include <lunchbox/lockable.h> // member
29 : #include <lunchbox/spinLock.h> // member
30 : #include <lunchbox/stdExt.h> // member
31 :
32 : #include "dataIStreamQueue.h" // member
33 :
34 : namespace co
35 : {
36 : class InstanceCache;
37 :
38 : /** An object store manages Object mapping for a LocalNode. */
39 : class ObjectStore : public Dispatcher
40 : {
41 : public:
42 : /** Construct a new ObjectStore. */
43 : ObjectStore( LocalNode* localNode, a_ssize_t* counters );
44 :
45 : /** Destruct this ObjectStore. */
46 : virtual ~ObjectStore();
47 :
48 : /** Remove all objects and clear all caches. */
49 : void clear();
50 :
51 : /**
52 : * Return the master node id for an identifier.
53 : *
54 : * @param id the identifier.
55 : * @return the master node, or 0 if no master node is
56 : * found for the identifier.
57 : */
58 : NodeID findMasterNodeID( const uint128_t& id );
59 :
60 : /** @name ICommand Dispatch */
61 : //@{
62 : /**
63 : * Dispatches an object command to the registered command queue.
64 : *
65 : * Object commands are dispatched to the appropriate objects mapped on
66 : * this session.
67 : *
68 : * @param command the command.
69 : * @return true if the command was dispatched, false otherwise.
70 : */
71 : bool dispatchObjectCommand( ICommand& command );
72 : //@}
73 :
74 : /** @name Object Registration */
75 : //@{
76 : /**
77 : * Register a distributed object.
78 : *
79 : * Registering a distributed object assigns a session-unique identifier
80 : * to this object, and makes this object the master version. The
81 : * identifier is used to map slave instances of the object. Master
82 : * versions of objects are typically writable and can commit new
83 : * versions of the distributed object.
84 : *
85 : * @param object the object instance.
86 : * @return true if the object was registered, false otherwise.
87 : */
88 : bool register_( Object* object );
89 :
90 : /**
91 : * Deregister a distributed object.
92 : *
93 : * @param object the object instance.
94 : */
95 : void deregister( Object* object );
96 :
97 : /** Start mapping a distributed object. */
98 : uint32_t mapNB( Object* object, const uint128_t& id,
99 : const uint128_t& version, NodePtr master );
100 :
101 : /** Finalize the mapping of a distributed object. */
102 : bool mapSync( const uint32_t requestID );
103 :
104 : /** Synchronize an object. */
105 : f_bool_t sync( Object* object, const uint128_t& id, NodePtr master,
106 : const uint32_t instanceID );
107 : /**
108 : * Unmap a mapped object.
109 : *
110 : * @param object the mapped object.
111 : */
112 : void unmap( Object* object );
113 :
114 : /**
115 : * Attach an object to an identifier.
116 : *
117 : * Attaching an object to an identifier enables it to receive object
118 : * commands though the local node. It does not establish any data
119 : * mapping to other object instances with the same identifier.
120 : *
121 : * @param object the object.
122 : * @param id the object identifier.
123 : * @param instanceID the node-local instance identifier, or
124 : * CO_INSTANCE_INVALID if this method should generate one.
125 : */
126 : void attach( Object* object, const uint128_t& id,
127 : const uint32_t instanceID );
128 :
129 : /**
130 : * Detach an object.
131 : *
132 : * @param object the attached object.
133 : */
134 : void detach( Object* object );
135 :
136 : /** @internal swap the existing object by a new object and keep
137 : the cm, id and instanceID. */
138 : void swap( Object* oldObject, Object* newObject );
139 : //@}
140 :
141 : /** @name Instance Cache. */
142 : //@{
143 : /** Expire all data older than age from the cache. */
144 : void expireInstanceData( const int64_t age );
145 :
146 : /** Remove all entries of the node from the cache. */
147 : void removeInstanceData( const NodeID& nodeID );
148 :
149 : /** Disable the instance cache of an stopped local node. */
150 : void disableInstanceCache();
151 :
152 : /** Enable sending data of newly registered objects when idle. */
153 : void enableSendOnRegister();
154 :
155 : /**
156 : * Disable sending data of newly registered objects when idle.
157 : *
158 : * Enable and disable are counted, that is, the last disable on a
159 : * matched series of enable/disable will be effective. When
160 : * send-on-register gets deactivated, the associated queue is cleared
161 : * and all data send on multicast connections is finished.
162 : */
163 : void disableSendOnRegister();
164 :
165 : /**
166 : * @internal
167 : * Notification - no pending commands for the command thread.
168 : * @return true if more work is pending.
169 : */
170 : virtual bool notifyCommandThreadIdle();
171 :
172 : /**
173 : * @internal
174 : * Remove a slave node in all objects
175 : */
176 : void removeNode( NodePtr node );
177 : //@}
178 :
179 : private:
180 : /** The local node managing the object store. */
181 : LocalNode* const _localNode;
182 :
183 : /** The identifiers for node-local instance identifiers. */
184 : lunchbox::a_int32_t _instanceIDs;
185 :
186 : /** enableSendOnRegister() invocations. */
187 : lunchbox::a_int32_t _sendOnRegister;
188 :
189 : typedef stde::hash_map< uint128_t, Objects > ObjectsHash;
190 : typedef ObjectsHash::const_iterator ObjectsHashCIter;
191 :
192 : /** All registered and mapped objects.
193 : * - locked writes (only in receiver thread)
194 : * - unlocked reads in receiver thread
195 : * - locked reads in all other threads
196 : */
197 : lunchbox::Lockable< ObjectsHash, lunchbox::SpinLock > _objects;
198 :
199 : struct SendQueueItem
200 : {
201 0 : SendQueueItem() : age( 0 ), object( 0 ) {}
202 : int64_t age;
203 : Object* object;
204 : };
205 :
206 : typedef std::deque< SendQueueItem > SendQueue;
207 : typedef SendQueue::iterator SendQueueIter;
208 :
209 : SendQueue _sendQueue; //!< Object data to broadcast when idle
210 : InstanceCache* _instanceCache; //!< cached object mapping data
211 : DataIStreamQueue _pushData; //!< Object::push() queue
212 : a_ssize_t* const _counters; // LocalNode performance counters
213 :
214 : void _attach( Object* object, const uint128_t& id,
215 : const uint32_t instanceID );
216 : void _detach( Object* object );
217 :
218 :
219 : /** Start synchronizing an object. */
220 : uint32_t _startSync( Object* object, const uint128_t& id, NodePtr master,
221 : const uint32_t instanceID );
222 :
223 : /** Finalize the synchronization of a distributed object. */
224 : bool _finishSync( const uint32_t requestID, Object* object );
225 :
226 : bool _checkInstanceCache( const uint128_t& id, uint128_t& from,
227 : uint128_t& to, uint32_t& instanceID );
228 :
229 : /** The command handler functions. */
230 : bool _cmdFindMasterNodeID( ICommand& command );
231 : bool _cmdFindMasterNodeIDReply( ICommand& command );
232 : bool _cmdAttach( ICommand& command );
233 : bool _cmdDetach( ICommand& command );
234 : bool _cmdMap( ICommand& command );
235 : bool _cmdMapSuccess( ICommand& command );
236 : bool _cmdMapReply( ICommand& command );
237 : bool _cmdSync( ICommand& command );
238 : bool _cmdSyncReply( ICommand& command );
239 : bool _cmdUnmap( ICommand& command );
240 : bool _cmdUnsubscribe( ICommand& command );
241 : bool _cmdInstance( ICommand& command );
242 : bool _cmdRegister( ICommand& command );
243 : bool _cmdDeregister( ICommand& command );
244 : bool _cmdDisableSendOnRegister( ICommand& command );
245 : bool _cmdRemoveNode( ICommand& command );
246 : bool _cmdPush( ICommand& command );
247 :
248 108 : LB_TS_VAR( _receiverThread );
249 108 : LB_TS_VAR( _commandThread );
250 : };
251 :
252 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore );
253 : }
254 : #endif // CO_OBJECTSTORE_H
|