Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "versionedSlaveCM.h"
22 :
23 : #include "log.h"
24 : #include "object.h"
25 : #include "objectCommand.h"
26 : #include "objectDataICommand.h"
27 : #include "objectDataIStream.h"
28 : #include "objectDataOCommand.h"
29 : #include <limits>
30 : #include <lunchbox/scopedMutex.h>
31 :
32 : namespace co
33 : {
34 : typedef CommandFunc<VersionedSlaveCM> CmdFunc;
35 :
36 34 : VersionedSlaveCM::VersionedSlaveCM(Object* object, uint32_t masterInstanceID)
37 : : ObjectCM(object)
38 : , _version(VERSION_NONE)
39 : , _currentIStream(0)
40 : #pragma warning(push)
41 : #pragma warning(disable : 4355)
42 : , _ostream(this)
43 : #pragma warning(pop)
44 34 : , _masterInstanceID(masterInstanceID)
45 : {
46 34 : LBASSERT(object);
47 :
48 34 : object->registerCommand(CMD_OBJECT_INSTANCE,
49 68 : CmdFunc(this, &VersionedSlaveCM::_cmdData), 0);
50 34 : object->registerCommand(CMD_OBJECT_DELTA,
51 68 : CmdFunc(this, &VersionedSlaveCM::_cmdData), 0);
52 34 : }
53 :
54 102 : VersionedSlaveCM::~VersionedSlaveCM()
55 : {
56 34 : while (!_queuedVersions.isEmpty())
57 0 : delete _queuedVersions.pop();
58 :
59 34 : LBASSERT(!_currentIStream);
60 34 : delete _currentIStream;
61 68 : }
62 :
63 0 : uint128_t VersionedSlaveCM::commit(const uint32_t)
64 : {
65 : #if 0
66 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
67 : << std::endl;
68 : #endif
69 0 : if (!_object->isDirty() || !_master || !_master->isReachable())
70 0 : return VERSION_NONE;
71 :
72 0 : _ostream.enableSlaveCommit(_master);
73 0 : _object->pack(_ostream);
74 0 : _ostream.disable();
75 :
76 0 : return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
77 : }
78 :
79 309 : uint128_t VersionedSlaveCM::sync(const uint128_t& v)
80 : {
81 : #if 0
82 : LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
83 : << "." << _object->getInstanceID() << std::endl;
84 : #endif
85 309 : if (_version == v)
86 200 : return _version;
87 :
88 109 : if (v == VERSION_HEAD)
89 : {
90 0 : _syncToHead();
91 0 : return _version;
92 : }
93 :
94 109 : const uint128_t version = (v == VERSION_NEXT) ? _version + 1 : v;
95 109 : LBASSERTINFO(version.high() == 0, "Not a master version: " << version)
96 109 : LBASSERTINFO(_version <= version,
97 : "can't sync to older version of object "
98 : << lunchbox::className(_object) << " " << _object->getID()
99 : << " (" << _version << ", " << version << ")");
100 :
101 329 : while (_version < version)
102 110 : _unpackOneVersion(_queuedVersions.pop());
103 :
104 218 : LocalNodePtr node = _object->getLocalNode();
105 109 : if (node.isValid())
106 109 : node->flushCommands();
107 :
108 109 : return _version;
109 : }
110 :
111 0 : void VersionedSlaveCM::_syncToHead()
112 : {
113 0 : if (_queuedVersions.isEmpty())
114 0 : return;
115 :
116 0 : ObjectDataIStream* is = 0;
117 0 : while (_queuedVersions.tryPop(is))
118 0 : _unpackOneVersion(is);
119 :
120 0 : LocalNodePtr localNode = _object->getLocalNode();
121 0 : if (localNode.isValid())
122 0 : localNode->flushCommands();
123 : }
124 :
125 144 : void VersionedSlaveCM::_releaseStream(ObjectDataIStream* stream)
126 : {
127 : #ifdef CO_AGGRESSIVE_CACHING
128 : stream->reset();
129 : _iStreamCache.release(stream);
130 : #else
131 144 : delete stream;
132 : #endif
133 144 : }
134 :
135 0 : uint128_t VersionedSlaveCM::getHeadVersion() const
136 : {
137 0 : ObjectDataIStream* is = 0;
138 0 : if (_queuedVersions.getBack(is))
139 : {
140 0 : LBASSERT(is);
141 0 : return is->getVersion();
142 : }
143 0 : return _version;
144 : }
145 :
146 110 : void VersionedSlaveCM::_unpackOneVersion(ObjectDataIStream* is)
147 : {
148 110 : LBASSERT(is);
149 110 : LBASSERTINFO(_version == is->getVersion() - 1 || _version == VERSION_NONE,
150 : "Expected version " << _version + 1 << " or 0, got "
151 : << is->getVersion() << " for "
152 : << *_object);
153 :
154 110 : if (is->hasInstanceData())
155 4 : _object->applyInstanceData(*is);
156 : else
157 106 : _object->unpack(*is);
158 :
159 110 : _version = is->getVersion();
160 110 : _sendAck();
161 :
162 110 : LBASSERT(_version != VERSION_INVALID);
163 110 : LBASSERT(_version != VERSION_NONE);
164 110 : LBASSERTINFO(is->getRemainingBufferSize() == 0 &&
165 : is->nRemainingBuffers() == 0,
166 : "Object " << typeid(*_object).name()
167 : << " did not unpack all data");
168 :
169 : #if 0
170 : LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
171 : << _object->getID() << "." << _object->getInstanceID()
172 : << std::endl;
173 : #endif
174 110 : _releaseStream(is);
175 110 : }
176 :
177 110 : void VersionedSlaveCM::_sendAck()
178 : {
179 110 : const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
180 110 : if (maxVersion <= _version.low()) // overflow: default unblocking commit
181 108 : return;
182 :
183 4 : _object->send(_master, CMD_OBJECT_MAX_VERSION, _masterInstanceID)
184 6 : << maxVersion << _object->getInstanceID();
185 : }
186 :
187 34 : void VersionedSlaveCM::applyMapData(const uint128_t& version)
188 : {
189 : while (true)
190 : {
191 34 : ObjectDataIStream* is = _queuedVersions.pop();
192 34 : if (is->getVersion() == version)
193 : {
194 34 : LBASSERTINFO(is->hasInstanceData(), *_object);
195 :
196 34 : if (is->hasData()) // not VERSION_NONE
197 31 : _object->applyInstanceData(*is);
198 34 : _version = is->getVersion();
199 :
200 34 : LBASSERT(_version != VERSION_INVALID);
201 34 : LBASSERTINFO(!is->hasData(),
202 : lunchbox::className(_object)
203 : << " did not unpack all data, "
204 : << is->getRemainingBufferSize() << " bytes, "
205 : << is->nRemainingBuffers() << " buffer(s)");
206 :
207 34 : _releaseStream(is);
208 34 : return;
209 : }
210 : else
211 : {
212 : // Found the following case:
213 : // - p1, t1 calls commit
214 : // - p1, t2 calls mapObject
215 : // - p1, cmd commits new version
216 : // - p1, cmd subscribes object
217 : // - p1, rcv attaches object
218 : // - p1, cmd receives commit data
219 : // -> newly attached object recv new commit data before map data,
220 : // ignore it
221 0 : LBASSERTINFO(is->getVersion() > version, is->getVersion()
222 : << " <= " << version);
223 0 : _releaseStream(is);
224 : }
225 0 : }
226 : }
227 :
228 0 : void VersionedSlaveCM::addInstanceDatas(const ObjectDataIStreamDeque& cache,
229 : const uint128_t& startVersion)
230 : {
231 0 : LB_TS_THREAD(_rcvThread);
232 : #if 0
233 : LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
234 : #endif
235 :
236 0 : uint128_t oldest = VERSION_NONE;
237 0 : uint128_t newest = VERSION_NONE;
238 0 : if (!_queuedVersions.isEmpty())
239 : {
240 0 : ObjectDataIStream* is = 0;
241 :
242 0 : LBCHECK(_queuedVersions.getFront(is));
243 0 : oldest = is->getVersion();
244 :
245 0 : LBCHECK(_queuedVersions.getBack(is));
246 0 : newest = is->getVersion();
247 : }
248 :
249 0 : ObjectDataIStreamDeque head;
250 0 : ObjectDataIStreams tail;
251 :
252 0 : for (ObjectDataIStreamDeque::const_iterator i = cache.begin();
253 0 : i != cache.end(); ++i)
254 : {
255 0 : ObjectDataIStream* stream = *i;
256 0 : const uint128_t& version = stream->getVersion();
257 0 : if (version < startVersion)
258 0 : continue;
259 :
260 0 : LBASSERT(stream->isReady());
261 0 : LBASSERT(stream->hasInstanceData());
262 0 : if (!stream->isReady())
263 0 : break;
264 :
265 0 : if (version < oldest)
266 0 : head.push_front(stream);
267 0 : else if (version > newest)
268 0 : tail.push_back(stream);
269 : }
270 :
271 0 : for (ObjectDataIStreamDeque::const_iterator i = head.begin();
272 0 : i != head.end(); ++i)
273 : {
274 0 : const ObjectDataIStream* stream = *i;
275 : #ifndef NDEBUG
276 0 : ObjectDataIStream* debugStream = 0;
277 0 : _queuedVersions.getFront(debugStream);
278 0 : if (debugStream)
279 0 : LBASSERT(debugStream->getVersion() == stream->getVersion() + 1);
280 : #endif
281 0 : _queuedVersions.pushFront(new ObjectDataIStream(*stream));
282 : }
283 :
284 0 : for (ObjectDataIStreams::const_iterator i = tail.begin(); i != tail.end();
285 : ++i)
286 : {
287 0 : const ObjectDataIStream* stream = *i;
288 : #ifndef NDEBUG
289 0 : ObjectDataIStream* debugStream = 0;
290 0 : _queuedVersions.getBack(debugStream);
291 0 : if (debugStream)
292 : {
293 0 : LBASSERT(debugStream->getVersion() + 1 == stream->getVersion());
294 : }
295 : #endif
296 0 : _queuedVersions.push(new ObjectDataIStream(*stream));
297 : }
298 0 : }
299 :
300 : //---------------------------------------------------------------------------
301 : // command handlers
302 : //---------------------------------------------------------------------------
303 144 : bool VersionedSlaveCM::_cmdData(ICommand& cmd)
304 : {
305 288 : ObjectDataICommand command(cmd);
306 :
307 144 : LB_TS_THREAD(_rcvThread);
308 144 : LBASSERT(command.getNode().isValid());
309 :
310 144 : if (!_currentIStream)
311 144 : _currentIStream = _iStreamCache.alloc();
312 :
313 144 : _currentIStream->addDataCommand(command);
314 144 : if (_currentIStream->isReady())
315 : {
316 144 : const uint128_t& version = _currentIStream->getVersion();
317 : #ifndef NDEBUG
318 144 : ObjectDataIStream* debugStream = 0;
319 144 : _queuedVersions.getBack(debugStream);
320 144 : if (debugStream)
321 : {
322 98 : LBASSERT(debugStream->getVersion() + 1 == version ||
323 : debugStream->getVersion() == VERSION_NONE);
324 : }
325 : #endif
326 144 : _queuedVersions.push(_currentIStream);
327 144 : _object->notifyNewHeadVersion(version);
328 144 : _currentIStream = 0;
329 : }
330 288 : return true;
331 : }
332 63 : }
|