CFEL - ASG Software Suite  2.5.0
CASS
sacla_offline_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2014, 2015 Lutz Foucar
2 
3 /**
4  * @file sacla_offline_input.cpp file contains definition of sacla offline input
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <iostream>
10 #include <iomanip>
11 #include <fstream>
12 #include <string>
13 #include <sstream>
14 #include <stdexcept>
15 
16 #include <QtCore/QFileInfo>
17 
18 #include <DataAccessUserAPI.h>
19 
20 #include "sacla_offline_input.h"
21 
22 #include "cass_settings.h"
23 #include "log.h"
24 #include "sacla_converter.h"
25 
26 using namespace std;
27 using namespace cass;
28 
29 namespace cass
30 {
31 /** A processor for a tag list
32  *
33  * processes a list of tags. The list is given by iterators to the first and one
34  * beyond the last element that should be processed.
35  *
36  * @author Lutz Foucar
37  */
39 {
40 public:
41  /** typedef the shared pointer of this */
42  typedef std::tr1::shared_ptr<TagListProcessor> shared_pointer;
43 
44  /** constructor
45  *
46  * inline set the provided parameters and intialize counter with 0
47  *
48  * @param liststart iterator to the start of the tag list
49  * @param listend iterator to the end of the tag list
50  * @param blNbr the beamline number of the experiment
51  * @param highTagNbr the high tag number of the experiment
52  */
53  TagListProcessor(vector<int>::const_iterator liststart,
54  vector<int>::const_iterator listend,
55  int runNbr, int blNbr, int highTagNbr)
56  : _liststart(liststart),
57  _listend(listend),
58  _iter(liststart),
59  _runNbr(runNbr),
60  _blNbr(blNbr),
61  _highTagNbr(highTagNbr),
62  _counter(0),
63  _skippedeventscounter(0)
64  { }
65 
66  /** process the tags on the list */
67  void runthis()
68  {
69  /** load the right reader for the file type depending on its extension */
70  SACLAConverter convert;
71  convert.loadSettings();
72  convert.cacheParameters(_liststart,_listend,_blNbr,_runNbr,_highTagNbr);
73  /** read and convert the info for each of the tags */
74  _iter = _liststart;
75  string output("TagListProcessor: The following tags will be processed by '" +
76  toString(this) + "' for beamline '" + toString(_blNbr) +
77  "' (size is '" + toString(distance(_liststart,_listend)) +
78  "'):");
79  for (; _iter != _listend; ++_iter)
80  output += " '" + toString(*_iter) + "',";
81  Log::add(Log::VERBOSEINFO,output);
82 
83  /** get reference to the global input, which we use to interact with the
84  * ringbuffer and the ratemeter
85  */
86  InputBase::shared_pointer::element_type& input(InputBase::reference());
87 
88  /** reset the iterator to the start of the list and then iterate through
89  * the list of tags and check every iteration whether the input should quit
90  */
91  _iter = _liststart;
92  for(;(!input.shouldQuit()) && (_iter != _listend); ++_iter)
93  {
94  /** retrieve a new element from the ringbuffer, in case it is an iterator
95  * to the end of the buffer, continue to the next iterator of this list
96  */
97  InputBase::rbItem_t rbItem(input.getNextFillable());
98  if (rbItem == input.ringbuffer().end())
99  continue;
100 
101  /** fill the cassevent object with the contents from the file */
102  uint64_t datasize = convert(_highTagNbr,*_iter,*rbItem->element);
103 
104  /** in case nothing was retieved, issue a warning. Increase the counter
105  * otherwise
106  */
107  if (!datasize)
108  {
109  Log::add(Log::WARNING,"TagListProcessor: Event with id '"+
110  toString(rbItem->element->id()) + "' is bad: skipping Event");
111  ++_skippedeventscounter;
112  }
113  else
114  ++_counter;
115 
116  /** let the ratemeter know how much we retrieved and return the event
117  * to the ringbuffer
118  */
119  input.newEventAdded(datasize);
120  input.ringbuffer().doneFilling(rbItem, datasize);
121  }
122  }
123 
124  /** retrieve the progess within the list
125  *
126  * @return the current progress
127  */
128  double progress()
129  {
130  const double currsize(distance(_liststart,_iter));
131  const double fullsize(distance(_liststart,_listend));
132  return currsize/fullsize;
133  }
134 
135  /** retrieve the number of events processed by this thread
136  *
137  * @return the number of processed events
138  */
139  uint64_t nEventsProcessed() {return _counter;}
140 
141  /** retrieve the number of events skipped by this thread
142  *
143  * @return the number of skipped events
144  */
145  uint64_t nEventsSkipped() {return _skippedeventscounter;}
146 
147 private:
148  /** iterator to the start of the list */
149  vector<int>::const_iterator _liststart;
150 
151  /** iterator to the end of the list */
152  vector<int>::const_iterator _listend;
153 
154  /** iterator to the current item being processed */
155  vector<int>::const_iterator _iter;
156 
157 
158  /** the run number for the experiment */
159  int _runNbr;
160 
161  /** the beamline number for the experiment */
162  int _blNbr;
163 
164  /** the first part of the tag (that doesn't change) */
166 
167  /** a counter to count how many events (tags) have been processed */
168  uint64_t _counter;
169 
170  /** a counter to count how many events (tags) have been skipped */
172 };
173 
174 /** retrieve the list of tags and the associated high tag number
175  *
176  * @return false in case of an error, true otherwise
177  * @param[out] taglist the taglist for the run and beamline
178  * @param[out] highTagNbr the high tag number for the run at beamline
179  * @param[in] blNbr the beamline number where the run was taken
180  * @param[in] runNbr the run number for which the tags should be returned
181  *
182  * @author Lutz Foucar
183  */
184 bool getCompleteTagList(vector<int> &taglist, int &highTagNbr, int blNbr, int runNbr)
185 {
186  /** get the lowest and highest tag number for the run */
187  int funcstatus,startTagNbr,endTagNbr = 0;
188  funcstatus = sy_read_start_tagnumber(&highTagNbr,&startTagNbr,blNbr,runNbr);
189  if (funcstatus)
190  {
191  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve start tag of run '" +
192  toString(runNbr) + "' at beamline '" + toString(blNbr) +
193  "' Errorcode is '" + toString(funcstatus) + "'");
194  return false;
195  }
196  funcstatus = sy_read_end_tagnumber(&highTagNbr,&endTagNbr,blNbr,runNbr);
197  if (funcstatus)
198  {
199  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve end tag of run '" +
200  toString(runNbr) + "' at beamline '" + toString(blNbr) +
201  "' Errorcode is '" + toString(funcstatus) + "'");
202  return false;
203  }
204 
205  /** get the tag list
206  *
207  * @Note one has to create a SALCA strucht that allows to retrieve arrays
208  * One has to kind of allocate and destroy these structs, which is
209  * completely not exception safe...
210  */
211  Log::add(Log::VERBOSEINFO,"getCompleteTagList: get Taglist for tags between '" +
212  toString(startTagNbr) + "' and '" + toString(endTagNbr) +
213  "' with highTag '" + toString(highTagNbr)+ "' for run '" +
214  toString(runNbr) + "' at beamline '" + toString(blNbr) + "'");
215  struct da_int_array *tagListBuffer=NULL;
216  funcstatus = da_alloc_int_array(&tagListBuffer,0,NULL);
217  if (funcstatus)
218  {
219  Log::add(Log::ERROR,string("getCompleteTagList: could not allocate the ") +
220  "sacla int array struct. Errorcode is '" +
221  toString(funcstatus) + "'");
222  da_destroy_int_array(&tagListBuffer);
223  return false;
224  }
225  funcstatus = sy_read_taglist(tagListBuffer,blNbr,highTagNbr,startTagNbr,endTagNbr);
226  if (funcstatus)
227  {
228  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve taglist of run '" +
229  toString(runNbr) + "' at beamline '" + toString(blNbr) +
230  "'. Errorcode is '" + toString(funcstatus) + "'");
231  da_destroy_int_array(&tagListBuffer);
232  return false;
233  }
234  /** get size of taglist */
235  int tagListSize = -1;
236  funcstatus = da_getsize_int_array(&tagListSize, tagListBuffer);
237  if (funcstatus)
238  {
239  Log::add(Log::ERROR,string("getCompleteTagList: could not get the size of ") +
240  "sacla int array struct. Errorcode is '" +
241  toString(funcstatus) + "'");
242  da_destroy_int_array(&tagListBuffer);
243  return false;
244  }
245  /** copy the data to the vector */
246  taglist.clear();
247  for (size_t i(0); i < static_cast<size_t>(tagListSize); ++i)
248  {
249  int buffer(0);
250  da_getint_int_array(&buffer,tagListBuffer,i);
251  taglist.push_back(buffer);
252  }
253  /** dealloc the array */
254  funcstatus = da_destroy_int_array(&tagListBuffer);
255  if (funcstatus)
256  {
257  Log::add(Log::ERROR,string("getCompleteTagList: error destroying the ") +
258  "sacla int array struct. Errorcode is '" + toString(funcstatus) +
259  "'");
260  return false;
261  }
262 
263  return true;
264 }
265 }//end namespace cass
266 
267 void SACLAOfflineInput::instance(string runlistname,
268  RingBuffer<CASSEvent> &ringbuffer,
269  Ratemeter &ratemeter, Ratemeter &loadmeter,
270  bool quitWhenDone,
271  QObject *parent)
272 {
273  if(_instance)
274  throw logic_error("SACLAOfflineInput::instance(): The instance of the base class is already initialized");
275  _instance = shared_pointer(new SACLAOfflineInput(runlistname,ringbuffer,ratemeter,loadmeter,quitWhenDone,parent));
276 }
277 
278 SACLAOfflineInput::SACLAOfflineInput(string runlistname,
279  RingBuffer<CASSEvent> &ringbuffer,
280  Ratemeter &ratemeter, Ratemeter &loadmeter,
281  bool quitWhenDone,
282  QObject *parent)
283  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
284  _quitWhenDone(quitWhenDone),
285  _runlistname(runlistname)
286 {
287  Log::add(Log::VERBOSEINFO, "SACLAOfflineInput::SACLAOFflineInput: constructed");
288  load();
289 }
290 
292 {
293  CASSSettings s;
294  s.beginGroup("SACLAOfflineInput");
295  _chunks = s.value("NbrThreads",1).toInt();
296 }
297 
299 {
301  Tokenizer tokenize;
302 
303  /** get a list of all runs to process */
304  Log::add(Log::VERBOSEINFO,"SACLAOfflineInput::run(): try to open filelist '" +
305  _runlistname + "'");
306  ifstream runlistfile(_runlistname.c_str());
307  if (!runlistfile.is_open())
308  throw invalid_argument("SACLAOfflineInput::run(): filelist '" + _runlistname +
309  "' could not be opened");
310  vector<string> runlist(tokenize(runlistfile));
311  runlistfile.close();
312 
313  /** add an eventcounter */
314  uint64_t eventcounter(0);
315 
316  /** iterate through the list of runs */
317  vector<string>::const_iterator runlistIt(runlist.begin());
318  vector<string>::const_iterator runlistEnd(runlist.end());
319  while ((!shouldQuit()) && (runlistIt != runlistEnd))
320  {
321  /** split the runname into the run and beamline combination */
322  string runname(*runlistIt++);
323  stringstream ss(runname);
324  string str;
325  vector<int> nbrs;
326  while(getline(ss,str,','))
327  {
328  stringstream ssvalue(str);
329  int value;
330  ssvalue >> value;
331  nbrs.push_back(value);
332  }
333  if (nbrs.size() < 2)
334  {
335  Log::add(Log::ERROR,"SACLAOfflineInput: Could not split information '" +
336  runname + "' into a beamline number and runname");
337  continue;
338  }
339  int blNbr(nbrs[0]);
340  int runNbr(nbrs[1]);
341 
342  /** check if the runstatus is set to 'run ended' and thus the
343  * data is available to read */
344  int runstatus(-1);
345  int funcstatus(0);
346  funcstatus = sy_read_runstatus(&runstatus,blNbr,runNbr);
347  if (funcstatus)
348  {
349  Log::add(Log::ERROR,string("SACLAOfflineInput: could not retrieve ") +
350  "run status of run '" + toString(runNbr) + "' at beamline '" +
351  toString(blNbr) + "'. Errorcode is '" + toString(funcstatus) +
352  "'");
353  continue;
354  }
355  if (runstatus != 0)
356  {
357  Log::add(Log::ERROR,"SACLAOfflineInput: run '" + toString(runNbr) +
358  "' at beamline '" + toString(blNbr) + "' has not finised yet");
359  continue;
360  }
361 
362  /** the rest of the line could be a separated list of tags,
363  * add them to the id list
364  */
365  vector<int> taglist(nbrs.begin()+2,nbrs.end());
366  int highTagNbr(0);
367 
368  /** if the user did not provide a tag list, get the tag list from the API
369  * If there was an error, then continue with the next run in the runlist
370  */
371  if (taglist.empty())
372  {
373  if (!getCompleteTagList(taglist,highTagNbr,blNbr,runNbr))
374  continue;
375  }
376  /** otherwise check if the provided tags are part of the provided run */
377  else
378  {
379  vector<int> completeTagList;
380  if (!getCompleteTagList(completeTagList,highTagNbr,blNbr,runNbr))
381  continue;
382  vector<int>::const_iterator tagIter(taglist.begin());
383  vector<int>::const_iterator taglistEnd(taglist.end());
384  bool didnotfind(false);
385  for (;tagIter != taglistEnd; ++tagIter)
386  {
387  if (find(completeTagList.begin(),completeTagList.end(),*tagIter) == completeTagList.end())
388  {
389  didnotfind = true;
390  break;
391  }
392  }
393  if (didnotfind)
394  {
395  Log::add(Log::ERROR,"SACLAOfflineInput: the provided tag '" +
396  toString(*tagIter) + "' is not part of run '" + toString(runNbr) +
397  "' at beamline '" + toString(blNbr) + "'");
398  continue;
399  }
400  }
401 
402  /** split the taglist into the user requested amount of chunks
403  * and process the splittet tag list in separate threads
404  */
405  int chunksize = taglist.size() / _chunks;
406  for (int chunk(0); chunk < _chunks-1; ++chunk)
407  {
408  vector<int>::const_iterator chunkstart(taglist.begin() + (chunk*chunksize));
409  vector<int>::const_iterator chunkend(taglist.begin() + (chunk+1)*chunksize);
410  /** generate a processor for the chunk */
412  processor(new TagListProcessor(chunkstart,chunkend,runNbr,blNbr,highTagNbr));
413  /** start the processor */
414  processor->start();
415  /** put the processor in the processor container */
416  _procs.push_back(processor);
417  }
418  /** if there are tags in the list remaining, add the into the last processor */
419  vector<int>::const_iterator chunkstart(taglist.begin() + ((_chunks-1)*chunksize));
421  processor(new TagListProcessor(chunkstart,taglist.end(),runNbr,blNbr,highTagNbr));
422  /** start the processor */
423  processor->start();
424  /** put the processor in the processor container */
425  _procs.push_back(processor);
426 
427  /** wait until all threads are finished and sum up the total events */
428  proc_t::iterator processorsIt(_procs.begin());
429  proc_t::iterator processorsEnd(_procs.end());
430  for (; processorsIt != processorsEnd; ++processorsIt)
431  {
432  (*processorsIt)->wait();
433  (*processorsIt)->rethrowException();
434  eventcounter += (*processorsIt)->nEventsProcessed();
435  }
436  }
437 
438  Log::add(Log::INFO,"SACLAOfflineInput::run(): Finished with all runs.");
439  /** in case the input should not quit when everything has been processed, wait
440  * until the input thread is told to quit
441  */
442  if(!_quitWhenDone)
443  while(!shouldQuit())
444  this->sleep(1);
445  Log::add(Log::VERBOSEINFO, "SACLAOfflineInput::run(): closing the input");
446  Log::add(Log::INFO,"SACLAOfflineInput::run(): Analysed '" + toString(eventcounter) +
447  "' events.");
448 }
449 
451 {
452  double progressSum(0.);
453  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
454  progressSum += (*it)->progress();
455  return (progressSum / _procs.size());
456 }
457 
459 {
460  uint64_t counter(0);
461  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
462  counter += (*it)->nEventsProcessed();
463  return counter;
464 }
465 
467 {
468  uint64_t counter(0);
469  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
470  counter += (*it)->nEventsSkipped();
471  return counter;
472 }
uint64_t eventcounter()
retrieve the number of processed events
uint64_t nEventsProcessed()
retrieve the number of events processed by this thread
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
sleep(unsigned long secs)
status_t _status
the internal status of the thread
void runthis()
process the tags on the list
Settings for CASS.
Definition: cass_settings.h:30
Class for reading SACLA data.
double progress()
retrieve the progress state
double progress()
retrieve the progess within the list
proc_t _procs
the processor container
void cacheParameters(std::vector< int >::const_iterator first, std::vector< int >::const_iterator last, int blNbr, int runNbr, int highTagNbr)
retrieve requested beamline parameters in one go
void runthis()
function with the main loop
Input base class.
Definition: input_base.h:31
A processor for a tag list.
STL namespace.
void loadSettings()
load the settings of the reader
uint64_t skippedeventcounter()
retrieve the number of skipped processed events
vector< int >::const_iterator _listend
iterator to the end of the list
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
void load()
load the parameters used for the input
std::string _runlistname
name of the file containing all files that we need to process
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
std::tr1::shared_ptr< TagListProcessor > shared_pointer
typedef the shared pointer of this
bool getCompleteTagList(vector< int > &taglist, int &highTagNbr, int blNbr, int runNbr)
retrieve the list of tags and the associated high tag number
vector< int >::const_iterator _liststart
iterator to the start of the list
TagListProcessor(vector< int >::const_iterator liststart, vector< int >::const_iterator listend, int runNbr, int blNbr, int highTagNbr)
constructor
int _blNbr
the beamline number for the experiment
bool shouldQuit() const
query whether this thread is told to quit
uint64_t nEventsSkipped()
retrieve the number of events skipped by this thread
tokenize to return all lines of an ascii file in a vector
Definition: cass.h:111
vector< int >::const_iterator _iter
iterator to the current item being processed
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
SACLA Offline Input for cass.
value(const QString &key, const QVariant &defaultValue=QVariant()
int _chunks
number of chuncks that the list should be split into
int _highTagNbr
the first part of the tag (that doesn't change)
int _runNbr
the run number for the experiment
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
Example of how to use the sacla online input
Definition: SACLA-online.ini:2
uint64_t _skippedeventscounter
a counter to count how many events (tags) have been skipped
file contains declaration of sacla offline input
contains a logger for cass
bool _quitWhenDone
flag to tell the thread to quit when its done with all files
contains class to convert sacla data to cassevent
check if there is some light in the chamber based upon the GMD value
beginGroup(const QString &prefix)
uint64_t _counter
a counter to count how many events (tags) have been processed
A QThread that has the ability to be paused and resumed.