Lunchbox  1.16.0
Multi-threaded C++ toolbox library for all application developers creating high-performance multi-threaded programs.
mtQueue.ipp
1 
2 /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3  * Daniel Nachbaur <danielnachbaur@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or modify it under
6  * the terms of the GNU Lesser General Public License version 2.1 as published
7  * by the Free Software Foundation.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 namespace lunchbox
20 {
21 template <typename T, size_t S>
23 {
24  if (this == &from)
25  return *this;
26 
27  std::unique_lock<std::mutex> fromLock(from._mutex);
28  std::deque<T> copy = from._queue;
29  const size_t maxSize = from._maxSize;
30  fromLock.unlock();
31 
32  std::unique_lock<std::mutex> lock(_mutex);
33  _maxSize = maxSize;
34  _queue.swap(copy);
35  _condition.notify_all();
36  return *this;
37 }
38 
39 template <typename T, size_t S>
40 const T& MTQueue<T, S>::operator[](const size_t index) const
41 {
42  std::unique_lock<std::mutex> lock(_mutex);
43  _condition.wait(lock, [&] { return _queue.size() > index; });
44 
45  return _queue[index];
46 }
47 
48 template <typename T, size_t S>
49 void MTQueue<T, S>::setMaxSize(const size_t maxSize)
50 {
51  std::unique_lock<std::mutex> lock(_mutex);
52  _condition.wait(lock, [&] { return _queue.size() <= maxSize; });
53 
54  _maxSize = maxSize;
55  _condition.notify_all();
56 }
57 
58 template <typename T, size_t S>
59 size_t MTQueue<T, S>::waitSize(const size_t minSize) const
60 {
61  LBASSERT(minSize <= _maxSize);
62  std::unique_lock<std::mutex> lock(_mutex);
63  _condition.wait(lock, [&] { return _queue.size() >= minSize; });
64  return _queue.size();
65 }
66 
67 template <typename T, size_t S>
69 {
70  std::unique_lock<std::mutex> lock(_mutex);
71  _queue.clear();
72  _condition.notify_all();
73 }
74 
75 template <typename T, size_t S>
77 {
78  std::unique_lock<std::mutex> lock(_mutex);
79  _condition.wait(lock, [&] { return !_queue.empty(); });
80 
81  T element = _queue.front();
82  _queue.pop_front();
83  _condition.notify_all();
84  return element;
85 }
86 
87 template <typename T, size_t S>
88 bool MTQueue<T, S>::timedPop(const unsigned timeout, T& element)
89 {
90  std::unique_lock<std::mutex> lock(_mutex);
91  _condition.wait_for(lock, std::chrono::milliseconds(timeout),
92  [&] { return !_queue.empty(); });
93  if (_queue.empty())
94  return false;
95 
96  element = _queue.front();
97  _queue.pop_front();
98  _condition.notify_all();
99  return true;
100 }
101 
102 template <typename T, size_t S>
103 std::vector<T> MTQueue<T, S>::timedPopRange(const unsigned timeout,
104  const size_t minimum,
105  const size_t maximum)
106 {
107  std::vector<T> result;
108 
109  std::unique_lock<std::mutex> lock(_mutex);
110  _condition.wait_for(lock, std::chrono::milliseconds(timeout),
111  [&] { return _queue.size() >= minimum; });
112  if (_queue.size() < minimum)
113  return result;
114 
115  const size_t size = LB_MIN(maximum, _queue.size());
116 
117  result.reserve(size);
118  result.insert(result.end(), _queue.begin(), _queue.begin() + size);
119  _queue.erase(_queue.begin(), _queue.begin() + size);
120 
121  _condition.notify_all();
122  return result;
123 }
124 
125 template <typename T, size_t S>
126 bool MTQueue<T, S>::tryPop(T& result)
127 {
128  std::unique_lock<std::mutex> lock(_mutex);
129  if (_queue.empty())
130  return false;
131 
132  result = _queue.front();
133  _queue.pop_front();
134  _condition.notify_all();
135  return true;
136 }
137 
138 template <typename T, size_t S>
139 void MTQueue<T, S>::tryPop(const size_t num, std::vector<T>& result)
140 {
141  std::unique_lock<std::mutex> lock(_mutex);
142  const size_t size = LB_MIN(num, _queue.size());
143  if (size > 0)
144  {
145  result.reserve(result.size() + size);
146  for (size_t i = 0; i < size; ++i)
147  {
148  result.push_back(_queue.front());
149  _queue.pop_front();
150  }
151  _condition.notify_all();
152  }
153 }
154 
156 template <typename T, size_t S>
157 class MTQueue<T, S>::Group
158 {
159  friend class MTQueue<T, S>;
160  size_t height_;
161  size_t waiting_;
162 
163 public:
168  explicit Group(const size_t height)
169  : height_(height)
170  , waiting_(0)
171  {
172  }
173 
175  void setHeight(const size_t height) { height_ = height; }
176 };
177 
178 template <typename T, size_t S>
179 bool MTQueue<T, S>::popBarrier(T& element, Group& barrier)
180 {
181  LBASSERT(barrier.height_ > 0)
182 
183  std::unique_lock<std::mutex> lock(_mutex);
184  ++barrier.waiting_;
185  _condition.wait(lock, [&] {
186  return !_queue.empty() || barrier.waiting_ >= barrier.height_;
187  });
188 
189  if (_queue.empty())
190  {
191  LBASSERT(barrier.waiting_ == barrier.height_);
192  _condition.notify_all();
193  return false;
194  }
195 
196  element = _queue.front();
197  _queue.pop_front();
198  --barrier.waiting_;
199  _condition.notify_all();
200  return true;
201 }
202 
203 template <typename T, size_t S>
204 bool MTQueue<T, S>::getFront(T& result) const
205 {
206  std::unique_lock<std::mutex> lock(_mutex);
207  if (_queue.empty())
208  return false;
209 
210  // else
211  result = _queue.front();
212  return true;
213 }
214 
215 template <typename T, size_t S>
216 bool MTQueue<T, S>::getBack(T& result) const
217 {
218  std::unique_lock<std::mutex> lock(_mutex);
219  if (_queue.empty())
220  return false;
221 
222  // else
223  result = _queue.back();
224  return true;
225 }
226 
227 template <typename T, size_t S>
228 void MTQueue<T, S>::push(const T& element)
229 {
230  std::unique_lock<std::mutex> lock(_mutex);
231  _condition.wait(lock, [&] { return _queue.size() < _maxSize; });
232  _queue.push_back(element);
233  _condition.notify_all();
234 }
235 
236 template <typename T, size_t S>
237 void MTQueue<T, S>::push(const std::vector<T>& elements)
238 {
239  std::unique_lock<std::mutex> lock(_mutex);
240  LBASSERT(elements.size() <= _maxSize);
241  _condition.wait(lock, [&] {
242  return (_maxSize - _queue.size()) >= elements.size();
243  });
244  ;
245  _queue.insert(_queue.end(), elements.begin(), elements.end());
246  _condition.notify_all();
247 }
248 
249 template <typename T, size_t S>
250 void MTQueue<T, S>::pushFront(const T& element)
251 {
252  std::unique_lock<std::mutex> lock(_mutex);
253  _condition.wait(lock, [&] { return _queue.size() < _maxSize; });
254  ;
255  _queue.push_front(element);
256  _condition.notify_all();
257 }
258 
259 template <typename T, size_t S>
260 void MTQueue<T, S>::pushFront(const std::vector<T>& elements)
261 {
262  std::unique_lock<std::mutex> lock(_mutex);
263  LBASSERT(elements.size() <= _maxSize);
264  _condition.wait(lock, [&] {
265  return (_maxSize - _queue.size()) >= elements.size();
266  });
267  ;
268  _queue.insert(_queue.begin(), elements.begin(), elements.end());
269  _condition.notify_all();
270 }
271 }
std::vector< T > timedPopRange(const unsigned timeout, const size_t minimum=1, const size_t maximum=S)
Retrieve a number of items from the front of the queue.
Definition: mtQueue.ipp:103
bool tryPop(T &result)
Retrieve and pop the front element from the queue if it is not empty.
Definition: mtQueue.ipp:126
MTQueue< T, S > & operator=(const MTQueue< T, S > &from)
Assign the values of another queue.
Definition: mtQueue.ipp:22
void pushFront(const T &element)
Push a new element to the front of the queue.
Definition: mtQueue.ipp:250
size_t waitSize(const size_t minSize) const
Wait for the size to be at least the number of given elements.
Definition: mtQueue.ipp:59
void clear()
Reset (empty) the queue.
Definition: mtQueue.ipp:68
A thread-safe queue with a blocking read access.
Definition: mtQueue.h:45
const T & operator[](const size_t index) const
Retrieve the requested element from the queue, may block.
Definition: mtQueue.ipp:40
Group(const size_t height)
Construct a new group of the given size.
Definition: mtQueue.ipp:168
bool getFront(T &result) const
Definition: mtQueue.ipp:204
bool popBarrier(T &result, Group &barrier)
Retrieve the front element, or abort if the barrier is reached.
Definition: mtQueue.ipp:179
#define LB_MIN(a, b)
returns the minimum of two values
Definition: types.h:90
void setHeight(const size_t height)
Update the height.
Definition: mtQueue.ipp:175
void setMaxSize(const size_t maxSize)
Set the new maximum size of the queue.
Definition: mtQueue.ipp:49
bool getBack(T &result) const
Definition: mtQueue.ipp:216
bool timedPop(const unsigned timeout, T &element)
Retrieve and pop the front element from the queue.
Definition: mtQueue.ipp:88
Group descriptor for popBarrier().
Definition: mtQueue.ipp:157
T pop()
Retrieve and pop the front element from the queue, may block.
Definition: mtQueue.ipp:76
Abstraction layer and common utilities for multi-threaded programming.
Definition: algorithm.h:29
void push(const T &element)
Push a new element to the back of the queue.
Definition: mtQueue.ipp:228