Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "versionedSlaveCM.h"
22 :
23 : #include "log.h"
24 : #include "object.h"
25 : #include "objectCommand.h"
26 : #include "objectDataICommand.h"
27 : #include "objectDataIStream.h"
28 : #include "objectDataOCommand.h"
29 : #include <lunchbox/scopedMutex.h>
30 : #include <limits>
31 :
32 : namespace co
33 : {
34 : typedef CommandFunc< VersionedSlaveCM > CmdFunc;
35 :
36 34 : VersionedSlaveCM::VersionedSlaveCM( Object* object, uint32_t masterInstanceID )
37 : : ObjectCM( object )
38 : , _version( VERSION_NONE )
39 : , _currentIStream( 0 )
40 : #pragma warning(push)
41 : #pragma warning(disable: 4355)
42 : , _ostream( this )
43 : #pragma warning(pop)
44 34 : , _masterInstanceID( masterInstanceID )
45 : {
46 34 : LBASSERT( object );
47 :
48 34 : object->registerCommand( CMD_OBJECT_INSTANCE,
49 68 : CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
50 34 : object->registerCommand( CMD_OBJECT_DELTA,
51 68 : CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
52 34 : }
53 :
54 102 : VersionedSlaveCM::~VersionedSlaveCM()
55 : {
56 34 : while( !_queuedVersions.isEmpty( ))
57 0 : delete _queuedVersions.pop();
58 :
59 34 : LBASSERT( !_currentIStream );
60 34 : delete _currentIStream;
61 34 : _currentIStream = 0;
62 :
63 34 : _version = VERSION_NONE;
64 34 : _master = 0;
65 68 : }
66 :
67 0 : uint128_t VersionedSlaveCM::commit( const uint32_t )
68 : {
69 : #if 0
70 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
71 : << std::endl;
72 : #endif
73 0 : if( !_object->isDirty() || !_master || !_master->isReachable( ))
74 0 : return VERSION_NONE;
75 :
76 0 : _ostream.enableSlaveCommit( _master );
77 0 : _object->pack( _ostream );
78 0 : _ostream.disable();
79 :
80 0 : return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
81 : }
82 :
83 309 : uint128_t VersionedSlaveCM::sync( const uint128_t& v )
84 : {
85 : #if 0
86 : LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
87 : << "." << _object->getInstanceID() << std::endl;
88 : #endif
89 309 : if( _version == v )
90 200 : return _version;
91 :
92 109 : if( v == VERSION_HEAD )
93 : {
94 0 : _syncToHead();
95 0 : return _version;
96 : }
97 :
98 109 : const uint128_t version = ( v == VERSION_NEXT ) ? _version + 1 : v;
99 109 : LBASSERTINFO( version.high() == 0, "Not a master version: " << version )
100 109 : LBASSERTINFO( _version <= version,
101 : "can't sync to older version of object " <<
102 : lunchbox::className( _object ) << " " << _object->getID() <<
103 : " (" << _version << ", " << version <<")" );
104 :
105 329 : while( _version < version )
106 110 : _unpackOneVersion( _queuedVersions.pop( ));
107 :
108 218 : LocalNodePtr node = _object->getLocalNode();
109 109 : if( node.isValid( ))
110 109 : node->flushCommands();
111 :
112 109 : return _version;
113 : }
114 :
115 0 : void VersionedSlaveCM::_syncToHead()
116 : {
117 0 : if( _queuedVersions.isEmpty( ))
118 0 : return;
119 :
120 0 : ObjectDataIStream* is = 0;
121 0 : while( _queuedVersions.tryPop( is ))
122 0 : _unpackOneVersion( is );
123 :
124 0 : LocalNodePtr localNode = _object->getLocalNode();
125 0 : if( localNode.isValid( ))
126 0 : localNode->flushCommands();
127 : }
128 :
129 144 : void VersionedSlaveCM::_releaseStream( ObjectDataIStream* stream )
130 : {
131 : #ifdef CO_AGGRESSIVE_CACHING
132 : stream->reset();
133 : _iStreamCache.release( stream );
134 : #else
135 144 : delete stream;
136 : #endif
137 144 : }
138 :
139 0 : uint128_t VersionedSlaveCM::getHeadVersion() const
140 : {
141 0 : ObjectDataIStream* is = 0;
142 0 : if( _queuedVersions.getBack( is ))
143 : {
144 0 : LBASSERT( is );
145 0 : return is->getVersion();
146 : }
147 0 : return _version;
148 : }
149 :
150 110 : void VersionedSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
151 : {
152 110 : LBASSERT( is );
153 110 : LBASSERTINFO( _version == is->getVersion() - 1 || _version == VERSION_NONE,
154 : "Expected version " << _version + 1 << " or 0, got "
155 : << is->getVersion() << " for " << *_object );
156 :
157 110 : if( is->hasInstanceData( ))
158 4 : _object->applyInstanceData( *is );
159 : else
160 106 : _object->unpack( *is );
161 :
162 110 : _version = is->getVersion();
163 110 : _sendAck();
164 :
165 110 : LBASSERT( _version != VERSION_INVALID );
166 110 : LBASSERT( _version != VERSION_NONE );
167 110 : LBASSERTINFO( is->getRemainingBufferSize()==0 && is->nRemainingBuffers()==0,
168 : "Object " << typeid( *_object ).name() <<
169 : " did not unpack all data" );
170 :
171 : #if 0
172 : LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
173 : << _object->getID() << "." << _object->getInstanceID()
174 : << std::endl;
175 : #endif
176 110 : _releaseStream( is );
177 110 : }
178 :
179 110 : void VersionedSlaveCM::_sendAck()
180 : {
181 110 : const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
182 110 : if( maxVersion <= _version.low( )) // overflow: default unblocking commit
183 108 : return;
184 :
185 4 : _object->send( _master, CMD_OBJECT_MAX_VERSION, _masterInstanceID )
186 6 : << maxVersion << _object->getInstanceID();
187 : }
188 :
189 34 : void VersionedSlaveCM::applyMapData( const uint128_t& version )
190 : {
191 : while( true )
192 : {
193 34 : ObjectDataIStream* is = _queuedVersions.pop();
194 34 : if( is->getVersion() == version )
195 : {
196 34 : LBASSERTINFO( is->hasInstanceData(), *_object );
197 :
198 34 : if( is->hasData( )) // not VERSION_NONE
199 31 : _object->applyInstanceData( *is );
200 34 : _version = is->getVersion();
201 :
202 34 : LBASSERT( _version != VERSION_INVALID );
203 34 : LBASSERTINFO( !is->hasData(),
204 : lunchbox::className( _object ) <<
205 : " did not unpack all data, " <<
206 : is->getRemainingBufferSize() << " bytes, " <<
207 : is->nRemainingBuffers() << " buffer(s)" );
208 :
209 34 : _releaseStream( is );
210 34 : return;
211 : }
212 : else
213 : {
214 : // Found the following case:
215 : // - p1, t1 calls commit
216 : // - p1, t2 calls mapObject
217 : // - p1, cmd commits new version
218 : // - p1, cmd subscribes object
219 : // - p1, rcv attaches object
220 : // - p1, cmd receives commit data
221 : // -> newly attached object recv new commit data before map data,
222 : // ignore it
223 0 : LBASSERTINFO( is->getVersion() > version,
224 : is->getVersion() << " <= " << version );
225 0 : _releaseStream( is );
226 : }
227 0 : }
228 : }
229 :
230 0 : void VersionedSlaveCM::addInstanceDatas( const ObjectDataIStreamDeque& cache,
231 : const uint128_t& startVersion )
232 : {
233 0 : LB_TS_THREAD( _rcvThread );
234 : #if 0
235 : LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
236 : #endif
237 :
238 0 : uint128_t oldest = VERSION_NONE;
239 0 : uint128_t newest = VERSION_NONE;
240 0 : if( !_queuedVersions.isEmpty( ))
241 : {
242 0 : ObjectDataIStream* is = 0;
243 :
244 0 : LBCHECK( _queuedVersions.getFront( is ));
245 0 : oldest = is->getVersion();
246 :
247 0 : LBCHECK( _queuedVersions.getBack( is ));
248 0 : newest = is->getVersion();
249 : }
250 :
251 0 : ObjectDataIStreamDeque head;
252 0 : ObjectDataIStreams tail;
253 :
254 0 : for( ObjectDataIStreamDeque::const_iterator i = cache.begin();
255 0 : i != cache.end(); ++i )
256 : {
257 0 : ObjectDataIStream* stream = *i;
258 0 : const uint128_t& version = stream->getVersion();
259 0 : if( version < startVersion )
260 0 : continue;
261 :
262 0 : LBASSERT( stream->isReady( ));
263 0 : LBASSERT( stream->hasInstanceData( ));
264 0 : if( !stream->isReady( ))
265 0 : break;
266 :
267 0 : if( version < oldest )
268 0 : head.push_front( stream );
269 0 : else if( version > newest )
270 0 : tail.push_back( stream );
271 : }
272 :
273 0 : for( ObjectDataIStreamDeque::const_iterator i = head.begin();
274 0 : i != head.end(); ++i )
275 : {
276 0 : const ObjectDataIStream* stream = *i;
277 : #ifndef NDEBUG
278 0 : ObjectDataIStream* debugStream = 0;
279 0 : _queuedVersions.getFront( debugStream );
280 0 : if( debugStream )
281 0 : LBASSERT( debugStream->getVersion() == stream->getVersion() + 1);
282 : #endif
283 0 : _queuedVersions.pushFront( new ObjectDataIStream( *stream ));
284 : }
285 :
286 0 : for( ObjectDataIStreams::const_iterator i = tail.begin();
287 0 : i != tail.end(); ++i )
288 : {
289 0 : const ObjectDataIStream* stream = *i;
290 : #ifndef NDEBUG
291 0 : ObjectDataIStream* debugStream = 0;
292 0 : _queuedVersions.getBack( debugStream );
293 0 : if( debugStream )
294 : {
295 0 : LBASSERT( debugStream->getVersion() + 1 == stream->getVersion( ));
296 : }
297 : #endif
298 0 : _queuedVersions.push( new ObjectDataIStream( *stream ));
299 : }
300 0 : }
301 :
302 : //---------------------------------------------------------------------------
303 : // command handlers
304 : //---------------------------------------------------------------------------
305 144 : bool VersionedSlaveCM::_cmdData( ICommand& cmd )
306 : {
307 288 : ObjectDataICommand command( cmd );
308 :
309 144 : LB_TS_THREAD( _rcvThread );
310 144 : LBASSERT( command.getNode().isValid( ));
311 :
312 144 : if( !_currentIStream )
313 144 : _currentIStream = _iStreamCache.alloc();
314 :
315 144 : _currentIStream->addDataCommand( command );
316 144 : if( _currentIStream->isReady( ))
317 : {
318 144 : const uint128_t& version = _currentIStream->getVersion();
319 : #ifndef NDEBUG
320 144 : ObjectDataIStream* debugStream = 0;
321 144 : _queuedVersions.getBack( debugStream );
322 144 : if ( debugStream )
323 : {
324 98 : LBASSERT( debugStream->getVersion() + 1 == version ||
325 : debugStream->getVersion() == VERSION_NONE );
326 : }
327 : #endif
328 144 : _queuedVersions.push( _currentIStream );
329 144 : _object->notifyNewHeadVersion( version );
330 144 : _currentIStream = 0;
331 : }
332 288 : return true;
333 : }
334 :
335 66 : }
|