Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "object.h"
22 :
23 : #include "dataIStream.h"
24 : #include "dataOStream.h"
25 : #include "deltaMasterCM.h"
26 : #include "fullMasterCM.h"
27 : #include "global.h"
28 : #include "log.h"
29 : #include "nodeCommand.h"
30 : #include "nullCM.h"
31 : #include "objectCM.h"
32 : #include "objectOCommand.h"
33 : #include "staticMasterCM.h"
34 : #include "staticSlaveCM.h"
35 : #include "types.h"
36 : #include "unbufferedMasterCM.h"
37 : #include "versionedSlaveCM.h"
38 :
39 : #include <iostream>
40 : #include <lunchbox/scopedMutex.h>
41 : #include <pression/data/Registry.h>
42 :
43 : namespace co
44 : {
45 : namespace detail
46 : {
47 51 : class Object
48 : {
49 : public:
50 51 : Object()
51 51 : : id(servus::make_UUID())
52 : , instanceID(CO_INSTANCE_INVALID)
53 51 : , cm(ObjectCM::ZERO)
54 : {
55 51 : }
56 :
57 : /** The session-unique object identifier. */
58 : uint128_t id;
59 :
60 : /** The node where this object is attached. */
61 : LocalNodePtr localNode;
62 :
63 : /** A session-unique identifier of the concrete instance. */
64 : uint32_t instanceID;
65 :
66 : /** The object's change manager. */
67 : ObjectCMPtr cm;
68 : };
69 : }
70 :
71 51 : Object::Object()
72 51 : : impl_(new detail::Object)
73 : {
74 51 : }
75 :
76 0 : Object::Object(const Object& object)
77 : : Dispatcher(object)
78 0 : , impl_(new detail::Object)
79 : {
80 0 : }
81 :
82 102 : Object::~Object()
83 : {
84 51 : LBASSERTINFO(!isAttached(), "Object " << impl_->id
85 : << " is still attached to node "
86 : << impl_->localNode->getNodeID());
87 :
88 51 : if (impl_->localNode)
89 0 : impl_->localNode->releaseObject(this);
90 :
91 51 : impl_->localNode = 0;
92 51 : impl_->cm = 0;
93 51 : delete impl_;
94 51 : }
95 :
96 914 : bool Object::isAttached() const
97 : {
98 914 : return impl_->instanceID != CO_INSTANCE_INVALID;
99 : }
100 :
101 1110 : LocalNodePtr Object::getLocalNode()
102 : {
103 1110 : return impl_->localNode;
104 : }
105 :
106 0 : void Object::setID(const uint128_t& identifier)
107 : {
108 0 : LBASSERT(!isAttached());
109 0 : LBASSERT(identifier.isUUID());
110 0 : impl_->id = identifier;
111 0 : }
112 :
113 360 : const uint128_t& Object::getID() const
114 : {
115 360 : return impl_->id;
116 : }
117 :
118 345 : uint32_t Object::getInstanceID() const
119 : {
120 345 : return impl_->instanceID;
121 : }
122 :
123 51 : void Object::attach(const uint128_t& id, const uint32_t instanceID)
124 : {
125 51 : LBASSERTINFO(!isAttached(), *this);
126 51 : LBASSERT(impl_->localNode);
127 51 : LBASSERT(instanceID <= CO_INSTANCE_MAX);
128 :
129 51 : impl_->id = id;
130 51 : impl_->instanceID = instanceID;
131 51 : LBLOG(LOG_OBJECTS) << impl_->id << '.' << impl_->instanceID << ": "
132 51 : << lunchbox::className(this)
133 153 : << (isMaster() ? " master" : " slave") << std::endl;
134 51 : }
135 :
136 51 : void Object::detach()
137 : {
138 51 : impl_->instanceID = CO_INSTANCE_INVALID;
139 51 : impl_->localNode = 0;
140 51 : }
141 :
142 50 : void Object::notifyDetach()
143 : {
144 50 : if (!isMaster())
145 84 : return;
146 :
147 : // unmap slaves
148 16 : const Nodes slaves = impl_->cm->getSlaveNodes();
149 15 : if (slaves.empty())
150 14 : return;
151 :
152 3 : LBLOG(LOG_BUG) << slaves.size() << " slaves subscribed during "
153 3 : << " deregister/unmap of " << lunchbox::className(this)
154 4 : << " id " << impl_->id << std::endl;
155 :
156 2 : for (NodesCIter i = slaves.begin(); i != slaves.end(); ++i)
157 : {
158 2 : NodePtr node = *i;
159 1 : node->send(CMD_NODE_UNMAP_OBJECT) << impl_->id;
160 : }
161 : }
162 :
163 0 : void Object::transfer(Object* from)
164 : {
165 0 : impl_->id = from->impl_->id;
166 0 : impl_->instanceID = from->getInstanceID();
167 0 : impl_->cm = from->impl_->cm;
168 0 : impl_->localNode = from->impl_->localNode;
169 0 : impl_->cm->setObject(this);
170 :
171 0 : from->impl_->cm = ObjectCM::ZERO;
172 0 : from->impl_->localNode = 0;
173 0 : from->impl_->instanceID = CO_INSTANCE_INVALID;
174 0 : }
175 :
176 68 : void Object::_setChangeManager(ObjectCMPtr cm)
177 : {
178 68 : if (impl_->cm != ObjectCM::ZERO)
179 : {
180 17 : LBVERB << "Overriding existing object change manager, obj "
181 17 : << lunchbox::className(this) << ", old cm "
182 17 : << lunchbox::className(impl_->cm) << ", new cm "
183 51 : << lunchbox::className(cm) << std::endl;
184 : }
185 :
186 68 : impl_->cm->exit();
187 68 : impl_->cm = cm;
188 68 : cm->init();
189 136 : LBLOG(LOG_OBJECTS) << "set " << lunchbox::className(cm) << " for "
190 204 : << lunchbox::className(this) << std::endl;
191 68 : }
192 :
193 40 : ObjectCMPtr Object::_getChangeManager()
194 : {
195 40 : return impl_->cm;
196 : }
197 :
198 390 : ObjectOCommand Object::send(NodePtr node, const uint32_t cmd,
199 : const uint32_t instanceID)
200 : {
201 779 : Connections connections(1, node->getConnection());
202 390 : return ObjectOCommand(connections, cmd, COMMANDTYPE_OBJECT, impl_->id,
203 777 : instanceID);
204 : }
205 :
206 5 : void Object::push(const uint128_t& groupID, const uint128_t& typeID,
207 : const Nodes& nodes)
208 : {
209 5 : impl_->cm->push(groupID, typeID, nodes);
210 5 : }
211 :
212 112 : uint128_t Object::commit(const uint32_t incarnation)
213 : {
214 112 : return impl_->cm->commit(incarnation);
215 : }
216 :
217 68 : void Object::setupChangeManager(const Object::ChangeType type,
218 : const bool master, LocalNodePtr localNode,
219 : const uint32_t masterInstanceID)
220 : {
221 68 : impl_->localNode = localNode;
222 :
223 68 : switch (type)
224 : {
225 : case Object::NONE:
226 17 : LBASSERT(!localNode);
227 17 : _setChangeManager(ObjectCM::ZERO);
228 17 : break;
229 :
230 : case Object::STATIC:
231 4 : LBASSERT(impl_->localNode);
232 4 : if (master)
233 2 : _setChangeManager(new StaticMasterCM(this));
234 : else
235 2 : _setChangeManager(new StaticSlaveCM(this));
236 4 : break;
237 :
238 : case Object::INSTANCE:
239 8 : LBASSERT(impl_->localNode);
240 8 : if (master)
241 4 : _setChangeManager(new FullMasterCM(this));
242 : else
243 4 : _setChangeManager(new VersionedSlaveCM(this, masterInstanceID));
244 8 : break;
245 :
246 : case Object::DELTA:
247 35 : LBASSERT(impl_->localNode);
248 35 : if (master)
249 7 : _setChangeManager(new DeltaMasterCM(this));
250 : else
251 28 : _setChangeManager(new VersionedSlaveCM(this, masterInstanceID));
252 35 : break;
253 :
254 : case Object::UNBUFFERED:
255 4 : LBASSERT(impl_->localNode);
256 4 : if (master)
257 2 : _setChangeManager(new UnbufferedMasterCM(this));
258 : else
259 2 : _setChangeManager(new VersionedSlaveCM(this, masterInstanceID));
260 4 : break;
261 :
262 : default:
263 0 : LBUNIMPLEMENTED;
264 : }
265 68 : }
266 :
267 : //---------------------------------------------------------------------------
268 : // ChangeManager forwarders
269 : //---------------------------------------------------------------------------
270 36 : void Object::applyMapData(const uint128_t& version)
271 : {
272 36 : impl_->cm->applyMapData(version);
273 36 : }
274 :
275 0 : void Object::sendInstanceData(const Nodes& nodes)
276 : {
277 0 : impl_->cm->sendInstanceData(nodes);
278 0 : }
279 :
280 0 : bool Object::isBuffered() const
281 : {
282 0 : return impl_->cm->isBuffered();
283 : }
284 :
285 372 : bool Object::isMaster() const
286 : {
287 372 : return impl_->cm->isMaster();
288 : }
289 :
290 33 : void Object::removeSlave(NodePtr node, const uint32_t instanceID)
291 : {
292 33 : impl_->cm->removeSlave(node, instanceID);
293 33 : }
294 :
295 25 : void Object::removeSlaves(NodePtr node)
296 : {
297 25 : impl_->cm->removeSlaves(node);
298 25 : }
299 :
300 36 : void Object::setMasterNode(NodePtr node)
301 : {
302 36 : impl_->cm->setMasterNode(node);
303 36 : }
304 :
305 0 : void Object::addInstanceDatas(const ObjectDataIStreamDeque& cache,
306 : const uint128_t& version)
307 : {
308 0 : impl_->cm->addInstanceDatas(cache, version);
309 0 : }
310 :
311 1 : void Object::setAutoObsolete(const uint32_t count)
312 : {
313 1 : impl_->cm->setAutoObsolete(count);
314 1 : }
315 :
316 0 : uint32_t Object::getAutoObsolete() const
317 : {
318 0 : return impl_->cm->getAutoObsolete();
319 : }
320 :
321 309 : uint128_t Object::sync(const uint128_t& version)
322 : {
323 309 : if (version == VERSION_NONE)
324 0 : return getVersion();
325 309 : return impl_->cm->sync(version);
326 : }
327 :
328 0 : uint128_t Object::getHeadVersion() const
329 : {
330 0 : return impl_->cm->getHeadVersion();
331 : }
332 :
333 1559 : uint128_t Object::getVersion() const
334 : {
335 1559 : return impl_->cm->getVersion();
336 : }
337 :
338 144 : void Object::notifyNewHeadVersion(const uint128_t& version LB_UNUSED)
339 : {
340 144 : LBASSERTINFO(getVersion() == VERSION_NONE || version < getVersion() + 100,
341 : lunchbox::className(this));
342 144 : }
343 :
344 172 : CompressorInfo Object::chooseCompressor() const
345 : {
346 172 : return pression::data::Registry::getInstance().choose();
347 : }
348 :
349 35 : uint32_t Object::getMasterInstanceID() const
350 : {
351 35 : return impl_->cm->getMasterInstanceID();
352 : }
353 :
354 35 : NodePtr Object::getMasterNode()
355 : {
356 35 : return impl_->cm->getMasterNode();
357 : }
358 :
359 0 : std::ostream& operator<<(std::ostream& os, const Object& object)
360 : {
361 0 : os << lunchbox::className(&object) << " " << object.getID() << "."
362 0 : << object.getInstanceID() << " v" << object.getVersion();
363 0 : return os;
364 : }
365 :
366 0 : std::ostream& operator<<(std::ostream& os, const Object::ChangeType& type)
367 : {
368 0 : return os << (type == Object::NONE
369 : ? "none"
370 0 : : type == Object::STATIC
371 0 : ? "static"
372 0 : : type == Object::INSTANCE
373 0 : ? "instance"
374 0 : : type == Object::DELTA
375 0 : ? "delta"
376 0 : : type == Object::UNBUFFERED
377 0 : ? "unbuffered"
378 0 : : "ERROR");
379 : }
380 63 : }
|