CFEL - ASG Software Suite  2.5.0
CASS
worker.h
Go to the documentation of this file.
1 //Copyright (C) 2009,2010,2013 Lutz Foucar
2 
3 /**
4  * @file worker.h file contains declaration of class Worker and Workers
5  *
6  * @author Lutz Foucar
7  */
8 
9 #ifndef __WORKER_H__
10 #define __WORKER_H__
11 
12 #include <QtCore/QMutex>
13 
14 #include <map>
15 #include <utility>
16 #include <string>
17 #include <tr1/memory>
18 
19 #include "pausablethread.h"
20 #include "cass.h"
21 #include "ringbuffer.hpp"
22 #include "cass_event.h"
23 
24 
25 namespace cass
26 {
27 //forward declarations
28 class Ratemeter;
29 class ProcessorManager;
30 
31 /** The worker thread.
32  *
33  * The thread will do the following tasks in a loop:
34  * - retrive an event form the buffer,
35  * - process it using the selected processors,
36  * - put the event back to the buffer
37  *
38  * @author Lutz Foucar
39  */
41 {
42 public:
43 
44  /** a shared pointer of this */
45  typedef std::tr1::shared_ptr<Worker> shared_pointer;
46 
47  /** constructor.
48  *
49  * @param rb the rinbguffer we get the events from
50  * @param ratemeter the ratemeter object to measure the rate
51  * @param parent the qt parent of this object
52  */
54  Ratemeter &ratemeter,
55  QObject *parent=0);
56 
57  /** start the thread.
58  *
59  * While the thread has not been quitted do
60  * retrieve a cassevent from the ringbuffer, but with a timeout. If we got
61  * a new event from the ringbuffer put it into the processing chain. Then put
62  * it back to the ringbuffer for new refilling and increase the counter
63  * of the ratemeter.
64  * Before processing double check if the event has a correct id
65  */
66  void runthis();
67 
68 private:
69  /** the ringbuffer */
71 
72  /** the processors */
74 
75  /** the ratemeter to measure the analysis rate */
77 };
78 
79 
80 
81 
82 
83 
84 
85 /** Worker Thread Handler.
86  *
87  * a class that will handle the requested amount of workers threads.
88  * The amount of threads can be set in cass.h via parameters
89  * @see NbrOfWorkers.
90  *
91  * @author Lutz Foucar
92  */
93 class Workers
94 {
95 public:
96  /** a shared pointer of this class */
97  typedef std::tr1::shared_ptr<Workers> shared_pointer;
98 
99  /** create and return an instance of this singleton
100  *
101  * when the instance has not yet been created, call the constructor otherwise
102  * just return the instance.
103  *
104  * @param rb the rinbguffer we get the events from
105  * @param ratemeter the ratemeter object to measure the rate
106  * @param parent the qt parent of this object
107  */
108  static shared_pointer instance(RingBuffer<CASSEvent> &rb,
109  Ratemeter &ratemeter,
110  QObject *parent=0);
111 
112  /** return a reference to the instance itselve if it exists */
113  static shared_pointer::element_type& reference();
114 
115  /** starts the threads
116  *
117  * function is not reentrant. One needs to use the _lock mutex to prevent
118  * simultanious calling of this function.
119  */
120  void start();
121 
122  /** pause the threads.
123  *
124  * Blocks until all threads are paused
125  *
126  * function is not reentrant. One needs to use the _lock mutex to prevent
127  * simultanious calling of this function.
128  */
129  void pause();
130 
131  /** resumes the threads
132  *
133  * function is not reentrant. One needs to use the _lock mutex to prevent
134  * simultanious calling of this function.
135  */
136  void resume();
137 
138  /** will set the flags to end the threads
139  *
140  * Will call the end()
141  * member of all workers. Then waits until all workers are finished. After this
142  * the aboutToQuit member of the processor and the Analyzer are notified.
143  *
144  * function is not reentrant. One needs to use the _lock mutex to prevent
145  * simultanious calling of this function.
146  */
147  void end();
148 
149  /** check if all workers are still running
150  *
151  * @return false if one of the workers is not running anymore
152  */
153  bool running()const;
154 
155  /** rethrow the exceptions thrown in the workers */
156  void rethrowException();
157 
158  /** a lock to be used by functions that are using this worker */
160 
161 private:
162  /** constructor.
163  *
164  * will create the requested amount of threads. and calls the load settings
165  * member of one of them.
166  *
167  * @param rb the rinbguffer we get the events from
168  * @param ratemeter the ratemeter object to measure the rate
169  * @param parent the qt parent of this object
170  */
172  Ratemeter &ratemeter,
173  QObject *parent=0);
174 
175  /** container of workers */
176  std::vector<Worker::shared_pointer> _workers;
177 
178  /** the instance of this class */
179  static shared_pointer _instance;
180 
181  /** mutex to protect the creation of the signelton */
182  static QMutex _mutex;
183 
184  /** the ringbuffer */
186 };
187 
188 }//end namespace cass
189 
190 #endif
Ratemeter & _ratemeter
the ratemeter to measure the analysis rate
Definition: worker.h:76
void rethrowException()
rethrow the exceptions thrown in the workers
Definition: worker.cpp:126
declaration of a pausable QThread
class calculating a rate in Hz.
Definition: ratemeter.h:28
Workers(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
constructor.
Definition: worker.cpp:87
file contains declaration of the CASSEvent
ProcessorManager & _process
the processors
Definition: worker.h:73
std::tr1::shared_ptr< Worker > shared_pointer
a shared pointer of this
Definition: worker.h:45
QMutex lock
a lock to be used by functions that are using this worker
Definition: worker.h:159
std::tr1::shared_ptr< Workers > shared_pointer
a shared pointer of this class
Definition: worker.h:97
container and call handler for all registered processors.
static shared_pointer _instance
the instance of this class
Definition: worker.h:179
void pause()
pause the threads.
Definition: worker.cpp:103
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
RingBuffer< CASSEvent > & _ringbuffer
the ringbuffer
Definition: worker.h:70
static shared_pointer::element_type & reference()
return a reference to the instance itselve if it exists
Definition: worker.cpp:76
Worker(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
constructor.
Definition: worker.cpp:23
void resume()
resumes the threads
Definition: worker.cpp:111
file contains the ringbuffer class
file contains global definitions for project cass
Worker Thread Handler.
Definition: worker.h:93
std::vector< Worker::shared_pointer > _workers
container of workers
Definition: worker.h:176
RingBuffer< CASSEvent > & _rb
the ringbuffer
Definition: worker.h:185
static QMutex _mutex
mutex to protect the creation of the signelton
Definition: worker.h:182
void end()
will set the flags to end the threads
Definition: worker.cpp:117
void start()
starts the threads
Definition: worker.cpp:97
static shared_pointer instance(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
create and return an instance of this singleton
Definition: worker.cpp:66
The worker thread.
Definition: worker.h:40
bool running() const
check if all workers are still running
Definition: worker.cpp:132
void runthis()
start the thread.
Definition: worker.cpp:32
A QThread that has the ability to be paused and resumed.