CFEL - ASG Software Suite  2.5.0
CASS
ringbuffer.hpp
Go to the documentation of this file.
1 //Copyright (C) 2008-2010 Lutz Foucar
2 
3 /**
4  * @file ringbuffer.hpp file contains the ringbuffer class
5  *
6  * @author Lutz Foucar
7  */
8 
9 #ifndef _RINGBUFFER_HPP_
10 #define _RINGBUFFER_HPP_
11 
12 #include <QtCore/QMutex>
13 #include <QtCore/QWaitCondition>
14 
15 #include <iostream>
16 #include <iomanip>
17 #include <vector>
18 #include <algorithm>
19 
20 #include "cass_exceptions.hpp"
21 #include "cass.h"
22 
23 namespace cass
24 {
25 /** A Ringbuffer, handles communication between Input and Worker Threads.
26  *
27  * The ringbuffer handles the main communication between the single producers
28  * (input derived from InputBase) and the multiple consumers (worker).
29  *
30  * The ringbuffer can be compiled or non blocking by defining RINGBUFFER_BLOCKING
31  * or not, respectively.
32  *
33  * It is designed in such a way, that in the nonblocking case, the consumers
34  * do not block the producer from putting new entries into the ringbuffer.
35  * If the producers velocity in filling the buffer varies, then this buffer
36  * will make sure, that it can be faster than the consumers. When the producer
37  * fills elements slower than the consumers consume them, then the consumers
38  * can consume the elements that have already been put into the buffer.
39  * They will do this by going backwards through the buffer from the last
40  * element that the producer has put into the buffer.
41  * The ringbuffers' elements will be created on the Heap.
42  *
43  * @tparam T Element typ
44  *
45  * @todo find out how one can use std::find to find the right element
46  * @todo maybe create a ReadWriteLock for each element to get rid of the mutexes
47  * @todo separeate declaration and definition to make class more readable
48  *
49  * @author Lutz Foucar
50  */
51 template <typename T>
53 {
54 private:
55  /** an element of the ringbuffer.
56  *
57  * contains the status of the element and a pointer
58  * to the actual element.
59  *
60  * @author Lutz Foucar
61  */
62  class Element
63  {
64  public:
65  /** constructor.
66  *
67  * will initalize the status flags correcty.
68  */
70  : processed(true),
71  filled(false),
72  inUse(false)
73  {}
74 
75  /** the pointer to the element */
76  std::tr1::shared_ptr<T> element;
77 
78  /** status whether the element has been worked on*/
79  bool processed;
80 
81  /** status whether the element has been filled*/
82  bool filled;
83 
84  /** status whether the element is workend on right now*/
85  bool inUse;
86  };
87 
88 public:
89  /** type of the container of all elements*/
90  typedef std::vector<Element> buffer_t;
91 
92  /** type of the interator over the elements of the container*/
93  typedef typename buffer_t::iterator iter_type;
94 
95  /** constructor.
96  *
97  * This will create the buffer, fill it with the requested amount of elements,
98  * and initialize the iterators.
99  *
100  * @param size The size of the ringbuffer
101  */
102  RingBuffer(size_t size)
103  : _buffer(size),
104  _nextToProcess(_buffer.begin()),
105  _nextToFill(_buffer.begin())
106  {
107  for (size_t i=0; i<_buffer.size(); ++i)
108  _buffer[i].element = std::tr1::shared_ptr<T>(new T());
109  }
110 
111  /** destructor */
113  {}
114 
115 private:
116  /** advances the _nextToProcess iterator to the next processable element.
117  *
118  * will go through the whole ringbuffer backwards starting at the
119  * position where the _nextToProcess pointer was put. It will check
120  * whether the current element is not currently in use, has been not been
121  * filled and is processed. If thats the case we need to check for the next
122  * element
123  *
124  * @return true when a processable element has been found
125  */
127  {
128  /** stop at the next item in tbe buffer*/
129  iter_type stop(_nextToProcess == _buffer.end()-1 ? _buffer.begin() : _nextToProcess + 1);
130 
131  /** search until the current element is not currently in use or
132  * not filled yet
133  */
134  while (_nextToProcess->inUse || !_nextToProcess->filled || _nextToProcess->processed)
135  {
136  /** stop at the position 1 beyond the start */
137  if (_nextToProcess == stop)
138  return false;
139 
140  /** we go backwards through the buffer to have always the latest
141  * element to process. If we come to the beginning of the vector, then
142  * we have to jump to the back
143  */
144  if (_nextToProcess == _buffer.begin())
145  _nextToProcess = _buffer.end()-1;
146  else
147  --_nextToProcess;
148  }
149  return true;
150  }
151 
152  /** advances the _nextToFill itertor to the next fillable element.
153  *
154  * this function is used when the behaviour of the ringbuffer is blockable
155  * it will iterate through the buffer and checks the elements for the
156  * status in progress (inBearbeitung) and processed (bearbeitet)
157  * it will only return true when its not in progress and already processed.
158  *
159  * this function is used when the behaviour of the ringbuffer is nonblockable
160  * it will iterate through the buffer and checks the elements for
161  * only the status in progress (inBearbeitung).
162  * it will only return true when its not in progress.
163  *
164  * @return true when a fillable element has been found
165  */
167  {
168  /** the start point is one before the current point where we started */
169  iter_type start((_nextToFill == _buffer.begin()) ? _buffer.end()-1 : _nextToFill-1);
170 
171 #ifdef RINGBUFFER_BLOCKING
172  /** search until the current element is not currently in use or
173  * has been processed
174  */
175  while (_nextToFill->inUse || !_nextToFill->processed)
176 #else
177  /** search until the current element is not currently in use */
178  while (_nextToFill->inUse)
179 #endif
180  {
181  /** if we end up where we started, then the elements are not yet
182  * processed or still in progress, so retrun that we have not found
183  * anything yet.
184  */
185  if (_nextToFill == start)
186  return false;
187 
188  /** wrap to beginning if we hit the end */
189  if (_nextToFill == _buffer.end()-1)
190  _nextToFill = _buffer.begin();
191  else
192  ++_nextToFill;
193  }
194  return true;
195  }
196 
197 public:
198  /** return the next filled but non processed element.
199  *
200  * This function will return the next filled element, which will
201  * either be the one just filled by the shared memory input or
202  * one or more before, depending on how fast elements are retrieved
203  * before they are filled again.
204  * When there are no Elements that we can work on, this function will wait
205  * until there is a new element that we can process.
206  *
207  * @note this can be the reason why only one of the threads is working at
208  * a time.
209  *
210  * @return iterator to the element of the ringbuffer that is processable and
211  * in case a timeout occured, return the end iterator of the buffer.
212  * @param[in] timeout Time that we will wait that a new element is beeing
213  * put into the buffer. It is defaulted to ULONG_MAX
214  */
215  iter_type nextToProcess(unsigned long timeout=ULONG_MAX)
216  {
217  QMutexLocker lock(&_mutex);
218 
219  /** if nothing was found, wait until we get noticed that
220  * a new element was added to the buffer and return 0 if
221  * waited long enough
222  */
223  while (!findNextProcessable())
224  if(!_processcondition.wait(lock.mutex(),timeout))
225  return _buffer.end();
226 
227  /** set the flags of that element */
228  _nextToProcess->inUse = true;
229  iter_type iter(_nextToProcess);
230 
231  /** The next element that will be asked for is the previous
232  * one. Unless a new element to be processed has been added to the buffer
233  * therefore let the iterator point to the previous element
234  */
235  if (_nextToProcess == _buffer.begin())
236  _nextToProcess = _buffer.end()-1;
237  else
238  --_nextToProcess;
239 
240  return iter;
241  }
242 
243  /** putting the processed element back to the buffer.
244  *
245  * This function will put the element that we just processed back to the buffer.
246  * It will will search the buffer for the element and then set the
247  * flags of that element according to its current state.
248  *
249  * @return void
250  * @return[in] iterator to the element in the buffer that is done processing
251  */
252  void doneProcessing(iter_type iter)
253  {
254  QMutexLocker lock(&_mutex);
255 
256  /** set flags */
257  iter->inUse = false;
258  iter->processed = true;
259  iter->filled = false;
260 
261  /** notify the waiting condition that something new is in the buffer
262  *
263  * @note we need to unlock the lock before
264  */
265  lock.unlock();
267  }
268 
269  /** retrieve the "to be filled" element.
270  *
271  * This function will retrieve the next element that we can fill.
272  * Depending on the behaviour of the ringbuffer, we check whether
273  * it has been processed or not. When the behaviour is blocking then
274  * we only retrieve elements that are processed, if not then we just
275  * return the next element that is not in process.
276  * In the blocking case this function will only return when a processed
277  * event was put into the buffer.
278  *
279  * @return iterator to the element that can be filled and in case a timeout
280  * occured, return the end iterator of the buffer.
281  * @param[in] timeout Time that we will wait that a new fillable element is
282  * beeing made available. It is defaulted to ULONG_MAX
283  */
284  iter_type nextToFill(unsigned long timeout=ULONG_MAX)
285  {
286  QMutexLocker lock(&_mutex);
287 
288  /** find an fillable element of the buffer, if there is no fillable, wait
289  * until a new element has been processed
290  *
291  * @note this is blocking until an element has been processed
292  */
293  while(!findNextFillable())
294  if (!_fillcondition.wait(lock.mutex(),timeout))
295  return _buffer.end();
296 
297  /** Set the flags accordingly */
298  _nextToFill->inUse = true;
299  iter_type iter(_nextToFill);
300 
301  /** the next element should be one that we are going
302  * to fill next. Therefore decrease the iterator by one
303  */
304  if (_nextToFill == _buffer.end()-1)
305  _nextToFill = _buffer.begin();
306  else
307  ++_nextToFill;
308 
309  return iter;
310  }
311 
312  /** putting the filled element back to the buffer.
313  *
314  * This function will put the element that we just filled back to the buffer.
315  * It will will search the buffer for the element and then set the
316  * flags of that element according to its current state and depending on the
317  * fillstatus. Using the fillstatus we can say that this element should be processed
318  * or not.
319  *
320  * @return void
321  * @return[in] element reference to the pointer of the element
322  * @return[in] fillstatus True when the element should be processed,
323  * false if not.
324  */
325  void doneFilling(iter_type iter, bool fillstatus=true)
326  {
327  QMutexLocker lock(&_mutex);
328 
329  /** set the status properties according to the fillstatus */
330  iter->inUse = false;
331  iter->processed = !fillstatus;
332  iter->filled = fillstatus;
333 
334  /** set the next to process iterator to this element, since its
335  * the next element that we should process. This should shorten
336  * the time we are searching for the next processable element
337  */
338  _nextToProcess = iter;
339 
340  /** notify the waiting condition that something new is in the buffer
341  *
342  * @note we need to unlock the lock before
343  */
344  lock.unlock();
346  }
347 
348  /** count how many elements of the buffer are not processed
349  *
350  * The number of elements in the buffer that are not processed tell how many
351  * are still beeing processed.
352  *
353  * @return number of elements that are in processing state
354  */
356  {
357  int count(0);
358  iter_type it(_buffer.begin());
359  iter_type end(_buffer.end());
360  for (; it != end; ++it)
361  if (!it->processed || it->inUse)
362  ++count;
363  return count;
364  }
365 
366  /** wait until no element that needs processing is on the list
367  *
368  * this function is blocking until all elements in the buffer are in the
369  * processed state.
370  */
372  {
373  QMutexLocker lock(&_mutex);
374  while (countProcessing() != 0)
375  _fillcondition.wait(lock.mutex(),100);
376  }
377 
378  /** @return the end element of the buffer (which is not good) */
379  iter_type end() {return _buffer.end();}
380 
381 private:
382  /** mutex to protect the iterators and the buffer elements */
384 
385  /** sync the filling part */
387 
388  /** sync the processing part */
390 
391  /** the ringbuffer container */
392  buffer_t _buffer;
393 
394  /** iterator to the next processable element */
395  iter_type _nextToProcess;
396 
397  /** iterator to the next fillable element */
398  iter_type _nextToFill;
399 };
400 }
401 #endif
an element of the ringbuffer.
Definition: ringbuffer.hpp:62
QWaitCondition _processcondition
sync the processing part
Definition: ringbuffer.hpp:389
iter_type nextToFill(unsigned long timeout=ULONG_MAX)
retrieve the "to be filled" element.
Definition: ringbuffer.hpp:284
void waitUntilEmpty()
wait until no element that needs processing is on the list
Definition: ringbuffer.hpp:371
~RingBuffer()
destructor
Definition: ringbuffer.hpp:112
Element()
constructor.
Definition: ringbuffer.hpp:69
iter_type nextToProcess(unsigned long timeout=ULONG_MAX)
return the next filled but non processed element.
Definition: ringbuffer.hpp:215
things written only at end of run H5Dump ProcessorSummary size
std::vector< Element > buffer_t
type of the container of all elements
Definition: ringbuffer.hpp:90
file contains custom exceptions used in cass
bool inUse
status whether the element is workend on right now
Definition: ringbuffer.hpp:85
bool findNextFillable()
advances the _nextToFill itertor to the next fillable element.
Definition: ringbuffer.hpp:166
void doneFilling(iter_type iter, bool fillstatus=true)
putting the filled element back to the buffer.
Definition: ringbuffer.hpp:325
QWaitCondition _fillcondition
sync the filling part
Definition: ringbuffer.hpp:386
iter_type end()
Definition: ringbuffer.hpp:379
QMutex _mutex
mutex to protect the iterators and the buffer elements
Definition: ringbuffer.hpp:383
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
wait(QMutex *mutex, unsigned long time=ULONG_MAX)
int countProcessing()
count how many elements of the buffer are not processed
Definition: ringbuffer.hpp:355
buffer_t::iterator iter_type
type of the interator over the elements of the container
Definition: ringbuffer.hpp:93
void doneProcessing(iter_type iter)
putting the processed element back to the buffer.
Definition: ringbuffer.hpp:252
file contains global definitions for project cass
buffer_t _buffer
the ringbuffer container
Definition: ringbuffer.hpp:392
std::tr1::shared_ptr< T > element
the pointer to the element
Definition: ringbuffer.hpp:76
bool filled
status whether the element has been filled
Definition: ringbuffer.hpp:82
RingBuffer(size_t size)
constructor.
Definition: ringbuffer.hpp:102
iter_type _nextToProcess
iterator to the next processable element
Definition: ringbuffer.hpp:395
bool findNextProcessable()
advances the _nextToProcess iterator to the next processable element.
Definition: ringbuffer.hpp:126
bool processed
status whether the element has been worked on
Definition: ringbuffer.hpp:79
iter_type _nextToFill
iterator to the next fillable element
Definition: ringbuffer.hpp:398