Lunchbox  1.15.0
Multi-threaded C++ toolbox library for all application developers creating high-performance multi-threaded programs.
mtQueue.ipp
1 
2 /* Copyright (c) 2005-2012, Stefan Eilemann <eile@equalizergraphics.com>
3  * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or modify it under
6  * the terms of the GNU Lesser General Public License version 2.1 as published
7  * by the Free Software Foundation.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 namespace lunchbox
20 {
21 template< typename T, size_t S >
23 {
24  if( this != &from )
25  {
26  from._cond.lock();
27  std::deque< T > copy = from._queue;
28  const size_t maxSize = from._maxSize;
29  from._cond.unlock();
30 
31  _cond.lock();
32  _maxSize = maxSize;
33  _queue.swap( copy );
34  _cond.signal();
35  _cond.unlock();
36  }
37  return *this;
38 }
39 
40 template< typename T, size_t S >
41 const T& MTQueue< T, S >::operator[]( const size_t index ) const
42 {
43  _cond.lock();
44  while( _queue.size() <= index )
45  _cond.wait();
46 
47  LBASSERT( _queue.size() > index );
48  const T& element = _queue[index];
49  _cond.unlock();
50  return element;
51 }
52 
53 template< typename T, size_t S >
54 void MTQueue< T, S >::setMaxSize( const size_t maxSize )
55 {
56  _cond.lock();
57  while( _queue.size() > maxSize )
58  _cond.wait();
59  _maxSize = maxSize;
60  _cond.signal();
61  _cond.unlock();
62 }
63 
64 template< typename T, size_t S >
65 size_t MTQueue< T, S >::waitSize( const size_t minSize ) const
66 {
67  LBASSERT( minSize <= _maxSize );
68  _cond.lock();
69  while( _queue.size() < minSize )
70  _cond.wait();
71  const size_t size = _queue.size();
72  _cond.unlock();
73  return size;
74 }
75 
76 template< typename T, size_t S >
78 {
79  _cond.lock();
80  _queue.clear();
81  _cond.signal();
82  _cond.unlock();
83 }
84 
85 template< typename T, size_t S >
87 {
88  _cond.lock();
89  while( _queue.empty( ))
90  _cond.wait();
91 
92  LBASSERT( !_queue.empty( ));
93  T element = _queue.front();
94  _queue.pop_front();
95  _cond.signal();
96  _cond.unlock();
97  return element;
98 }
99 
100 template< typename T, size_t S >
101 bool MTQueue< T, S >::timedPop( const unsigned timeout, T& element )
102 {
103  _cond.lock();
104  while( _queue.empty( ))
105  {
106  if( !_cond.timedWait( timeout ))
107  {
108  _cond.unlock();
109  return false;
110  }
111  }
112  LBASSERT( !_queue.empty( ));
113  element = _queue.front();
114  _queue.pop_front();
115  _cond.signal();
116  _cond.unlock();
117  return true;
118 }
119 
120 template< typename T, size_t S > std::vector< T >
121 MTQueue< T, S >::timedPopRange( const unsigned timeout, const size_t minimum,
122  const size_t maximum )
123 {
124  std::vector< T > result;
125 
126  _cond.lock();
127  while( _queue.size() < minimum )
128  {
129  if( !_cond.timedWait( timeout ))
130  {
131  _cond.unlock();
132  return result;
133  }
134  }
135 
136  const size_t size = LB_MIN( maximum, _queue.size( ));
137 
138  result.reserve( size );
139  result.insert( result.end(), _queue.begin(), _queue.begin() + size );
140  _queue.erase( _queue.begin(), _queue.begin() + size );
141 
142  _cond.unlock();
143  return result;
144 }
145 
146 template< typename T, size_t S >
147 bool MTQueue< T, S >::tryPop( T& result )
148 {
149  _cond.lock();
150  if( _queue.empty( ))
151  {
152  _cond.unlock();
153  return false;
154  }
155 
156  result = _queue.front();
157  _queue.pop_front();
158  _cond.signal();
159  _cond.unlock();
160  return true;
161 }
162 
163 template< typename T, size_t S >
164 void MTQueue< T, S >::tryPop( const size_t num, std::vector< T >& result )
165 {
166  _cond.lock();
167  const size_t size = LB_MIN( num, _queue.size( ));
168  if( size > 0 )
169  {
170  result.reserve( result.size() + size );
171  for( size_t i = 0; i < size; ++i )
172  {
173  result.push_back( _queue.front( ));
174  _queue.pop_front();
175  }
176  _cond.signal();
177  }
178  _cond.unlock();
179 }
180 
182 template< typename T, size_t S > class MTQueue< T, S >::Group
183 {
184  friend class MTQueue< T, S >;
185  size_t height_;
186  size_t waiting_;
187 
188 public:
193  explicit Group( const size_t height ) : height_( height ), waiting_( 0 ) {}
194 
196  void setHeight( const size_t height ) { height_ = height; }
197 };
198 
199 template< typename T, size_t S >
200 bool MTQueue< T, S >::popBarrier( T& element, Group& barrier )
201 {
202  LBASSERT( barrier.height_ > 0 )
203 
204  _cond.lock();
205  ++barrier.waiting_;
206  while( _queue.empty() && barrier.waiting_ < barrier.height_ )
207  _cond.wait();
208 
209  if( _queue.empty( ))
210  {
211  LBASSERT( barrier.waiting_ == barrier.height_ );
212  _cond.broadcast();
213  _cond.unlock();
214  return false;
215  }
216 
217  element = _queue.front();
218  _queue.pop_front();
219  --barrier.waiting_;
220  _cond.signal();
221  _cond.unlock();
222  return true;
223 
224 }
225 
226 template< typename T, size_t S >
227 bool MTQueue< T, S >::getFront( T& result ) const
228 {
229  _cond.lock();
230  if( _queue.empty( ))
231  {
232  _cond.unlock();
233  return false;
234  }
235  // else
236  result = _queue.front();
237  _cond.unlock();
238  return true;
239 }
240 
241 template< typename T, size_t S >
242 bool MTQueue< T, S >::getBack( T& result ) const
243 {
244  _cond.lock();
245  if( _queue.empty( ))
246  {
247  _cond.unlock();
248  return false;
249  }
250  // else
251  result = _queue.back();
252  _cond.unlock();
253  return true;
254 }
255 
256 template< typename T, size_t S >
257 void MTQueue< T, S >::push( const T& element )
258 {
259  _cond.lock();
260  while( _queue.size() >= _maxSize )
261  _cond.wait();
262  _queue.push_back( element );
263  _cond.signal();
264  _cond.unlock();
265 }
266 
267 template< typename T, size_t S >
268 void MTQueue< T, S >::push( const std::vector< T >& elements )
269 {
270  _cond.lock();
271  LBASSERT( elements.size() <= _maxSize );
272  while( (_maxSize - _queue.size( )) < elements.size( ))
273  _cond.wait();
274  _queue.insert( _queue.end(), elements.begin(), elements.end( ));
275  _cond.signal();
276  _cond.unlock();
277 }
278 
279 template< typename T, size_t S >
280 void MTQueue< T, S >::pushFront( const T& element )
281 {
282  _cond.lock();
283  while( _queue.size() >= _maxSize )
284  _cond.wait();
285  _queue.push_front( element );
286  _cond.signal();
287  _cond.unlock();
288 }
289 
290 template< typename T, size_t S >
291 void MTQueue< T, S >::pushFront( const std::vector< T >& elements )
292 {
293  _cond.lock();
294  LBASSERT( elements.size() <= _maxSize );
295  while( (_maxSize - _queue.size( )) < elements.size( ))
296  _cond.wait();
297  _queue.insert(_queue.begin(), elements.begin(), elements.end());
298  _cond.signal();
299  _cond.unlock();
300 }
301 }
std::vector< T > timedPopRange(const unsigned timeout, const size_t minimum=1, const size_t maximum=S)
Retrieve a number of items from the front of the queue.
Definition: mtQueue.ipp:121
bool tryPop(T &result)
Retrieve and pop the front element from the queue if it is not empty.
Definition: mtQueue.ipp:147
void unlock()
Unlock the mutex.
MTQueue< T, S > & operator=(const MTQueue< T, S > &from)
Assign the values of another queue.
Definition: mtQueue.ipp:22
void pushFront(const T &element)
Push a new element to the front of the queue.
Definition: mtQueue.ipp:280
size_t waitSize(const size_t minSize) const
Wait for the size to be at least the number of given elements.
Definition: mtQueue.ipp:65
void clear()
Reset (empty) the queue.
Definition: mtQueue.ipp:77
A thread-safe queue with a blocking read access.
Definition: mtQueue.h:43
const T & operator[](const size_t index) const
Retrieve the requested element from the queue, may block.
Definition: mtQueue.ipp:41
Group(const size_t height)
Construct a new group of the given size.
Definition: mtQueue.ipp:193
bool getFront(T &result) const
Definition: mtQueue.ipp:227
void lock()
Lock the mutex.
bool popBarrier(T &result, Group &barrier)
Retrieve the front element, or abort if the barrier is reached.
Definition: mtQueue.ipp:200
void broadcast()
Broadcast the condition.
#define LB_MIN(a, b)
returns the minimum of two values
Definition: types.h:88
void setHeight(const size_t height)
Update the height.
Definition: mtQueue.ipp:196
void setMaxSize(const size_t maxSize)
Set the new maximum size of the queue.
Definition: mtQueue.ipp:54
bool getBack(T &result) const
Definition: mtQueue.ipp:242
bool timedPop(const unsigned timeout, T &element)
Retrieve and pop the front element from the queue.
Definition: mtQueue.ipp:101
Group descriptor for popBarrier().
Definition: mtQueue.ipp:182
T pop()
Retrieve and pop the front element from the queue, may block.
Definition: mtQueue.ipp:86
Abstraction layer and common utilities for multi-threaded programming.
Definition: algorithm.h:32
void signal()
Signal the condition.
void wait()
Atomically unlock the mutex, wait for a signal and relock the mutex.
void push(const T &element)
Push a new element to the back of the queue.
Definition: mtQueue.ipp:257