CFEL - ASG Software Suite  2.5.0
CASS
worker.cpp
Go to the documentation of this file.
1 // Copyright (C) 2009, 2010,2013 Lutz Foucar
2 // Copyright (C) 2010 Jochen Küpper
3 
4 /**
5  * @file worker.cpp file contains definition of class Worker and Workers
6  *
7  * @author Lutz Foucar
8  */
9 
10 #include <stdexcept>
11 #include <QtCore/QMutexLocker>
12 
13 #include "worker.h"
14 
15 #include "ratemeter.h"
16 #include "processor_manager.h"
17 #include "log.h"
18 
19 
20 using namespace cass;
21 using namespace std;
22 
24  Ratemeter &ratemeter,
25  QObject *parent)
26  : PausableThread(lmf::PausableThread::_run,parent),
27  _ringbuffer(ringbuffer),
28  _process(ProcessorManager::reference()),
29  _ratemeter(ratemeter)
30 {}
31 
33 {
36  while(_control != _quit)
37  {
38  pausePoint();
39  /** ensure that one has retrieved a valid event and that the id is non zero number*/
40  if ((rbItem = _ringbuffer.nextToProcess(1000)) != _ringbuffer.end())
41  {
42  /** @note we need to check the element id in here, because if it is done
43  * in the outer if statement (as before) a valid ringbuffer element
44  * would never be returned to the buffer, as this only happens
45  * within the true if clause.
46  */
47  if(rbItem->element->id())
48  _process(*rbItem->element);
49  _ringbuffer.doneProcessing(rbItem);
50  _ratemeter.count();
51  }
52  }
53 }
54 
55 
56 
57 
58 
59 
60 
61 
62 // ============define static members (do not touch)==============
65 
67  Ratemeter &ratemeter,
68  QObject *parent)
69 {
71  if(!_instance)
72  _instance = Workers::shared_pointer(new Workers(ringbuffer, ratemeter, parent));
73  return _instance;
74 }
75 
76 Workers::shared_pointer::element_type& Workers::reference()
77 {
79  if (!_instance)
80  throw logic_error("Workers::reference(): The instance has not yet been created");
81  return *_instance;
82 }
83 //===============================================================
84 
85 
86 //-----------------------the wrapper for more than 1 worker--------------------
88  Ratemeter &ratemeter,
89  QObject *parent)
90  : _workers(NbrOfWorkers),
91  _rb(ringbuffer)
92 {
93  for (size_t i=0;i<_workers.size();++i)
94  _workers[i] = Worker::shared_pointer(new Worker(ringbuffer,ratemeter,parent));
95 }
96 
98 {
99  for (size_t i=0;i<_workers.size();++i)
100  _workers[i]->start();
101 }
102 
104 {
105  for (size_t i=0;i<_workers.size();++i)
106  _workers[i]->pause();
107  for (size_t i=0;i<_workers.size();++i)
108  _workers[i]->waitUntilPaused();
109 }
110 
112 {
113  for (size_t i=0;i<_workers.size();++i)
114  _workers[i]->resume();
115 }
116 
118 {
119  for (size_t i=0;i<_workers.size();++i)
120  _workers[i]->end();
121  for (size_t i=0;i<_workers.size();++i)
122  _workers[i]->wait();
123  ProcessorManager::instance()->aboutToQuit();
124 }
125 
127 {
128  for (size_t i=0;i<_workers.size();++i)
130 }
131 
132 bool Workers::running()const
133 {
134  bool running(true);
135  for (size_t i=0;i<_workers.size();++i)
136  running = running && _workers[i]->isRunning();
137  return running;
138 }
file contains declaration of class Worker and Workers
Ratemeter & _ratemeter
the ratemeter to measure the analysis rate
Definition: worker.h:76
void rethrowException()
rethrow the exceptions thrown in the workers
Definition: worker.cpp:126
class calculating a rate in Hz.
Definition: ratemeter.h:28
file contains declaration of class calculating a rate
status_t _status
the internal status of the thread
Workers(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
constructor.
Definition: worker.cpp:87
ProcessorManager & _process
the processors
Definition: worker.h:73
std::tr1::shared_ptr< Worker > shared_pointer
a shared pointer of this
Definition: worker.h:45
QMutex lock
a lock to be used by functions that are using this worker
Definition: worker.h:159
STL namespace.
std::tr1::shared_ptr< Workers > shared_pointer
a shared pointer of this class
Definition: worker.h:97
container and call handler for all registered processors.
static shared_pointer _instance
the instance of this class
Definition: worker.h:179
void pause()
pause the threads.
Definition: worker.cpp:103
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
static shared_pointer instance()
return the already created instance of this
RingBuffer< CASSEvent > & _ringbuffer
the ringbuffer
Definition: worker.h:70
static shared_pointer::element_type & reference()
return a reference to the instance itselve if it exists
Definition: worker.cpp:76
Worker(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
constructor.
Definition: worker.cpp:23
buffer_t::iterator iter_type
type of the interator over the elements of the container
Definition: ringbuffer.hpp:93
void resume()
resumes the threads
Definition: worker.cpp:111
control_t _control
the internal control status of the thread
contains the manager for the processors
void count(double increase=1.)
increase the counts
Definition: ratemeter.cpp:42
std::vector< Worker::shared_pointer > _workers
container of workers
Definition: worker.h:176
static QMutex _mutex
mutex to protect the creation of the signelton
Definition: worker.h:182
void end()
will set the flags to end the threads
Definition: worker.cpp:117
const size_t NbrOfWorkers
global variable to set the number of worker threads
Definition: cass.h:274
void start()
starts the threads
Definition: worker.cpp:97
static shared_pointer instance(RingBuffer< CASSEvent > &rb, Ratemeter &ratemeter, QObject *parent=0)
create and return an instance of this singleton
Definition: worker.cpp:66
void pausePoint()
point where the thread will be paused
The worker thread.
Definition: worker.h:40
bool running() const
check if all workers are still running
Definition: worker.cpp:132
contains a logger for cass
void runthis()
start the thread.
Definition: worker.cpp:32