Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "versionedSlaveCM.h"
22 :
23 : #include "log.h"
24 : #include "object.h"
25 : #include "objectCommand.h"
26 : #include "objectDataICommand.h"
27 : #include "objectDataIStream.h"
28 : #include "objectDataOCommand.h"
29 : #include <lunchbox/scopedMutex.h>
30 : #include <limits>
31 :
32 : namespace co
33 : {
34 : typedef CommandFunc< VersionedSlaveCM > CmdFunc;
35 :
36 34 : VersionedSlaveCM::VersionedSlaveCM( Object* object, uint32_t masterInstanceID )
37 : : ObjectCM( object )
38 : , _version( VERSION_NONE )
39 : , _currentIStream( 0 )
40 : #pragma warning(push)
41 : #pragma warning(disable: 4355)
42 : , _ostream( this )
43 : #pragma warning(pop)
44 34 : , _masterInstanceID( masterInstanceID )
45 : {
46 34 : LBASSERT( object );
47 :
48 : object->registerCommand( CMD_OBJECT_INSTANCE,
49 34 : CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
50 : object->registerCommand( CMD_OBJECT_DELTA,
51 34 : CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
52 34 : }
53 :
54 102 : VersionedSlaveCM::~VersionedSlaveCM()
55 : {
56 68 : while( !_queuedVersions.isEmpty( ))
57 0 : delete _queuedVersions.pop();
58 :
59 34 : LBASSERT( !_currentIStream );
60 34 : delete _currentIStream;
61 34 : _currentIStream = 0;
62 :
63 34 : _version = VERSION_NONE;
64 34 : _master = 0;
65 68 : }
66 :
67 0 : uint128_t VersionedSlaveCM::commit( const uint32_t )
68 : {
69 : #if 0
70 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
71 : << std::endl;
72 : #endif
73 0 : if( !_object->isDirty() || !_master || !_master->isReachable( ))
74 0 : return VERSION_NONE;
75 :
76 0 : _ostream.enableSlaveCommit( _master );
77 0 : _object->pack( _ostream );
78 0 : _ostream.disable();
79 :
80 0 : return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
81 : }
82 :
83 309 : uint128_t VersionedSlaveCM::sync( const uint128_t& v )
84 : {
85 : #if 0
86 : LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
87 : << "." << _object->getInstanceID() << std::endl;
88 : #endif
89 309 : if( _version == v )
90 200 : return _version;
91 :
92 109 : if( v == VERSION_HEAD )
93 : {
94 0 : _syncToHead();
95 0 : return _version;
96 : }
97 :
98 109 : const uint128_t version = ( v == VERSION_NEXT ) ? _version + 1 : v;
99 109 : LBASSERTINFO( version.high() == 0, "Not a master version: " << version )
100 109 : LBASSERTINFO( _version <= version,
101 : "can't sync to older version of object " <<
102 : lunchbox::className( _object ) << " " << _object->getID() <<
103 : " (" << _version << ", " << version <<")" );
104 :
105 328 : while( _version < version )
106 110 : _unpackOneVersion( _queuedVersions.pop( ));
107 :
108 109 : LocalNodePtr node = _object->getLocalNode();
109 109 : if( node.isValid( ))
110 109 : node->flushCommands();
111 :
112 109 : return _version;
113 : }
114 :
115 0 : void VersionedSlaveCM::_syncToHead()
116 : {
117 0 : if( _queuedVersions.isEmpty( ))
118 0 : return;
119 :
120 0 : ObjectDataIStream* is = 0;
121 0 : while( _queuedVersions.tryPop( is ))
122 0 : _unpackOneVersion( is );
123 :
124 0 : LocalNodePtr localNode = _object->getLocalNode();
125 0 : if( localNode.isValid( ))
126 0 : localNode->flushCommands();
127 : }
128 :
129 144 : void VersionedSlaveCM::_releaseStream( ObjectDataIStream* stream )
130 : {
131 : #ifdef CO_AGGRESSIVE_CACHING
132 : stream->reset();
133 : _iStreamCache.release( stream );
134 : #else
135 144 : delete stream;
136 : #endif
137 144 : }
138 :
139 0 : uint128_t VersionedSlaveCM::getHeadVersion() const
140 : {
141 0 : ObjectDataIStream* is = 0;
142 0 : if( _queuedVersions.getBack( is ))
143 : {
144 0 : LBASSERT( is );
145 0 : return is->getVersion();
146 : }
147 0 : return _version;
148 : }
149 :
150 110 : void VersionedSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
151 : {
152 110 : LBASSERT( is );
153 110 : LBASSERTINFO( _version == is->getVersion() - 1 || _version == VERSION_NONE,
154 : "Expected version " << _version + 1 << " or 0, got "
155 : << is->getVersion() << " for " << *_object );
156 :
157 110 : if( is->hasInstanceData( ))
158 4 : _object->applyInstanceData( *is );
159 : else
160 106 : _object->unpack( *is );
161 :
162 110 : _version = is->getVersion();
163 110 : _sendAck();
164 :
165 110 : LBASSERT( _version != VERSION_INVALID );
166 110 : LBASSERT( _version != VERSION_NONE );
167 110 : LBASSERTINFO( is->getRemainingBufferSize()==0 && is->nRemainingBuffers()==0,
168 : "Object " << typeid( *_object ).name() <<
169 : " did not unpack all data" );
170 :
171 : #if 0
172 : LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
173 : << _object->getID() << "." << _object->getInstanceID()
174 : << std::endl;
175 : #endif
176 110 : _releaseStream( is );
177 110 : }
178 :
179 110 : void VersionedSlaveCM::_sendAck()
180 : {
181 110 : const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
182 110 : if( maxVersion <= _version.low( )) // overflow: default unblocking commit
183 218 : return;
184 :
185 : _object->send( _master, CMD_OBJECT_MAX_VERSION, _masterInstanceID )
186 2 : << maxVersion << _object->getInstanceID();
187 : }
188 :
189 34 : void VersionedSlaveCM::applyMapData( const uint128_t& version )
190 : {
191 : while( true )
192 : {
193 34 : ObjectDataIStream* is = _queuedVersions.pop();
194 34 : if( is->getVersion() == version )
195 : {
196 34 : LBASSERTINFO( is->hasInstanceData(), *_object );
197 :
198 34 : if( is->hasData( )) // not VERSION_NONE
199 31 : _object->applyInstanceData( *is );
200 34 : _version = is->getVersion();
201 :
202 34 : LBASSERT( _version != VERSION_INVALID );
203 34 : LBASSERTINFO( !is->hasData(),
204 : lunchbox::className( _object ) <<
205 : " did not unpack all data, " <<
206 : is->getRemainingBufferSize() << " bytes, " <<
207 : is->nRemainingBuffers() << " buffer(s)" );
208 :
209 34 : _releaseStream( is );
210 : #if 0
211 : LBLOG( LOG_OBJECTS ) << "Mapped initial data of " << _object
212 : << std::endl;
213 : #endif
214 34 : return;
215 : }
216 : else
217 : {
218 : // Found the following case:
219 : // - p1, t1 calls commit
220 : // - p1, t2 calls mapObject
221 : // - p1, cmd commits new version
222 : // - p1, cmd subscribes object
223 : // - p1, rcv attaches object
224 : // - p1, cmd receives commit data
225 : // -> newly attached object recv new commit data before map data,
226 : // ignore it
227 0 : LBASSERTINFO( is->getVersion() > version,
228 : is->getVersion() << " <= " << version );
229 0 : _releaseStream( is );
230 : }
231 0 : }
232 : }
233 :
234 0 : void VersionedSlaveCM::addInstanceDatas( const ObjectDataIStreamDeque& cache,
235 : const uint128_t& startVersion )
236 : {
237 0 : LB_TS_THREAD( _rcvThread );
238 : #if 0
239 : LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
240 : #endif
241 :
242 0 : uint128_t oldest = VERSION_NONE;
243 0 : uint128_t newest = VERSION_NONE;
244 0 : if( !_queuedVersions.isEmpty( ))
245 : {
246 0 : ObjectDataIStream* is = 0;
247 :
248 0 : LBCHECK( _queuedVersions.getFront( is ));
249 0 : oldest = is->getVersion();
250 :
251 0 : LBCHECK( _queuedVersions.getBack( is ));
252 0 : newest = is->getVersion();
253 : }
254 :
255 0 : ObjectDataIStreamDeque head;
256 0 : ObjectDataIStreams tail;
257 :
258 0 : for( ObjectDataIStreamDeque::const_iterator i = cache.begin();
259 0 : i != cache.end(); ++i )
260 : {
261 0 : ObjectDataIStream* stream = *i;
262 0 : const uint128_t& version = stream->getVersion();
263 0 : if( version < startVersion )
264 0 : continue;
265 :
266 0 : LBASSERT( stream->isReady( ));
267 0 : LBASSERT( stream->hasInstanceData( ));
268 0 : if( !stream->isReady( ))
269 0 : break;
270 :
271 0 : if( version < oldest )
272 0 : head.push_front( stream );
273 0 : else if( version > newest )
274 0 : tail.push_back( stream );
275 : }
276 :
277 0 : for( ObjectDataIStreamDeque::const_iterator i = head.begin();
278 0 : i != head.end(); ++i )
279 : {
280 0 : const ObjectDataIStream* stream = *i;
281 : #ifndef NDEBUG
282 0 : ObjectDataIStream* debugStream = 0;
283 0 : _queuedVersions.getFront( debugStream );
284 0 : if( debugStream )
285 0 : LBASSERT( debugStream->getVersion() == stream->getVersion() + 1);
286 : #endif
287 0 : _queuedVersions.pushFront( new ObjectDataIStream( *stream ));
288 : #if 0
289 : LBLOG( LOG_OBJECTS ) << stream->getVersion() << ' ';
290 : #endif
291 : }
292 :
293 : #if 0
294 : LBLOG( LOG_OBJECTS ) << " back ";
295 : #endif
296 0 : for( ObjectDataIStreams::const_iterator i = tail.begin();
297 0 : i != tail.end(); ++i )
298 : {
299 0 : const ObjectDataIStream* stream = *i;
300 : #ifndef NDEBUG
301 0 : ObjectDataIStream* debugStream = 0;
302 0 : _queuedVersions.getBack( debugStream );
303 0 : if( debugStream )
304 : {
305 0 : LBASSERT( debugStream->getVersion() + 1 == stream->getVersion( ));
306 : }
307 : #endif
308 0 : _queuedVersions.push( new ObjectDataIStream( *stream ));
309 : #if 0
310 : LBLOG( LOG_OBJECTS ) << stream->getVersion() << ' ';
311 : #endif
312 0 : }
313 : #if 0
314 : LBLOG( LOG_OBJECTS ) << std::endl << lunchbox::enableFlush;
315 : #endif
316 0 : }
317 :
318 : //---------------------------------------------------------------------------
319 : // command handlers
320 : //---------------------------------------------------------------------------
321 144 : bool VersionedSlaveCM::_cmdData( ICommand& cmd )
322 : {
323 144 : ObjectDataICommand command( cmd );
324 :
325 144 : LB_TS_THREAD( _rcvThread );
326 144 : LBASSERT( command.getNode().isValid( ));
327 :
328 144 : if( !_currentIStream )
329 144 : _currentIStream = _iStreamCache.alloc();
330 :
331 144 : _currentIStream->addDataCommand( command );
332 144 : if( _currentIStream->isReady( ))
333 : {
334 144 : const uint128_t& version = _currentIStream->getVersion();
335 : #if 0
336 : LBLOG( LOG_OBJECTS ) << "v" << version << ", id " << _object->getID()
337 : << "." << _object->getInstanceID() << " ready"
338 : << std::endl;
339 : #endif
340 : #ifndef NDEBUG
341 144 : ObjectDataIStream* debugStream = 0;
342 144 : _queuedVersions.getBack( debugStream );
343 144 : if ( debugStream )
344 : {
345 98 : LBASSERT( debugStream->getVersion() + 1 == version ||
346 : debugStream->getVersion() == VERSION_NONE );
347 : }
348 : #endif
349 144 : _queuedVersions.push( _currentIStream );
350 144 : _object->notifyNewHeadVersion( version );
351 144 : _currentIStream = 0;
352 : }
353 144 : return true;
354 : }
355 :
356 66 : }
|