Line data Source code
1 :
2 : /* Copyright (c) 2011-2012, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "dataIStreamQueue.h"
22 :
23 : #include "objectDataICommand.h"
24 : #include "objectDataIStream.h"
25 :
26 : namespace co
27 : {
28 68 : DataIStreamQueue::DataIStreamQueue()
29 68 : {}
30 :
31 132 : DataIStreamQueue::~DataIStreamQueue()
32 : {
33 66 : LBASSERTINFO( _pending.empty(), "Incomplete commits pending" );
34 66 : LBASSERTINFO( _queued.isEmpty(), _queued.getSize() << " unapplied commits" )
35 :
36 66 : for( PendingStreamsCIter i = _pending.begin(); i != _pending.end(); ++i )
37 0 : delete i->second;
38 66 : _pending.clear();
39 :
40 66 : QueuedStream stream;
41 132 : while( _queued.tryPop( stream ))
42 0 : delete stream.second;
43 66 : }
44 :
45 0 : ObjectDataIStream* DataIStreamQueue::tryPop()
46 : {
47 0 : QueuedStream stream( uint128_t(), (ObjectDataIStream*)0 );
48 0 : _queued.tryPop( stream );
49 0 : return stream.second;
50 : }
51 :
52 5 : ObjectDataIStream* DataIStreamQueue::pull( const uint128_t& key )
53 : {
54 5 : ObjectDataIStream* is = 0;
55 5 : QueuedStreams unusedStreams;
56 15 : while( !is )
57 : {
58 5 : QueuedStream candidate = _queued.pop();
59 5 : if( candidate.first == key )
60 5 : is = candidate.second;
61 : else
62 0 : unusedStreams.push_back( candidate );
63 : }
64 :
65 5 : _queued.pushFront( unusedStreams );
66 5 : return is;
67 : }
68 :
69 5 : void DataIStreamQueue::recycle( ObjectDataIStream* stream )
70 : {
71 : #ifdef CO_AGGRESSIVE_CACHING
72 : stream->reset();
73 : _iStreamCache.release( stream );
74 : #else
75 5 : delete stream;
76 : #endif
77 5 : }
78 :
79 7 : bool DataIStreamQueue::addDataCommand( const uint128_t& key, ICommand& command )
80 : {
81 7 : LB_TS_THREAD( _thread );
82 7 : LBASSERTINFO( _pending.size() < 100, "More than 100 pending commits");
83 :
84 7 : ObjectDataIStream* istream = 0;
85 7 : PendingStreams::iterator i = _pending.find( key );
86 7 : if( i == _pending.end( ))
87 5 : istream = _iStreamCache.alloc();
88 : else
89 2 : istream = i->second;
90 :
91 7 : istream->addDataCommand( command );
92 7 : if( istream->isReady( ))
93 : {
94 5 : if( i != _pending.end( ))
95 2 : _pending.erase( i );
96 :
97 5 : _queued.push( QueuedStream( key, istream ));
98 : //LBLOG( LOG_OBJECTS ) << "Queued commit " << key << std::endl;
99 5 : return true;
100 : }
101 :
102 2 : if( i == _pending.end( ))
103 : {
104 2 : _pending[ key ] = istream;
105 : //LBLOG( LOG_OBJECTS ) << "New incomplete commit " << key << std::endl;
106 2 : return false;
107 : }
108 :
109 : //LBLOG(LOG_OBJECTS) << "Add data to incomplete commit " << key <<std::endl;
110 0 : return false;
111 : }
112 :
113 66 : }
|