CFEL - ASG Software Suite  2.5.0
Go to the documentation of this file.
1 // Copyright (C) 2014, 2015 Lutz Foucar
3 /**
4  * @file sacla_offline_input.cpp file contains definition of sacla offline input
5  *
6  * @author Lutz Foucar
7  */
9 #include <iostream>
10 #include <iomanip>
11 #include <fstream>
12 #include <string>
13 #include <sstream>
14 #include <stdexcept>
16 #include <QtCore/QFileInfo>
18 #include <DataAccessUserAPI.h>
20 #include "sacla_offline_input.h"
22 #include "cass_settings.h"
23 #include "log.h"
24 #include "sacla_converter.h"
26 using namespace std;
27 using namespace cass;
29 namespace cass
30 {
31 /** A processor for a tag list
32  *
33  * processes a list of tags. The list is given by iterators to the first and one
34  * beyond the last element that should be processed.
35  *
36  * @author Lutz Foucar
37  */
39 {
40 public:
41  /** typedef the shared pointer of this */
42  typedef std::tr1::shared_ptr<TagListProcessor> shared_pointer;
44  /** constructor
45  *
46  * inline set the provided parameters and intialize counter with 0
47  *
48  * @param liststart iterator to the start of the tag list
49  * @param listend iterator to the end of the tag list
50  * @param blNbr the beamline number of the experiment
51  * @param highTagNbr the high tag number of the experiment
52  */
53  TagListProcessor(vector<int>::const_iterator liststart,
54  vector<int>::const_iterator listend,
55  int runNbr, int blNbr, int highTagNbr)
56  : _liststart(liststart),
57  _listend(listend),
58  _iter(liststart),
59  _runNbr(runNbr),
60  _blNbr(blNbr),
61  _highTagNbr(highTagNbr),
62  _counter(0),
63  _skippedeventscounter(0)
64  { }
66  /** process the tags on the list */
67  void runthis()
68  {
69  /** load the right reader for the file type depending on its extension */
70  SACLAConverter convert;
71  convert.loadSettings();
72  convert.cacheParameters(_liststart,_listend,_blNbr,_runNbr,_highTagNbr);
73  /** read and convert the info for each of the tags */
74  _iter = _liststart;
75  string output("TagListProcessor: The following tags will be processed by '" +
76  toString(this) + "' for beamline '" + toString(_blNbr) +
77  "' (size is '" + toString(distance(_liststart,_listend)) +
78  "'):");
79  for (; _iter != _listend; ++_iter)
80  output += " '" + toString(*_iter) + "',";
81  Log::add(Log::VERBOSEINFO,output);
83  /** get reference to the global input, which we use to interact with the
84  * ringbuffer and the ratemeter
85  */
86  InputBase::shared_pointer::element_type& input(InputBase::reference());
88  /** reset the iterator to the start of the list and then iterate through
89  * the list of tags and check every iteration whether the input should quit
90  */
91  _iter = _liststart;
92  for(;(!input.shouldQuit()) && (_iter != _listend); ++_iter)
93  {
94  /** retrieve a new element from the ringbuffer, in case it is an iterator
95  * to the end of the buffer, continue to the next iterator of this list
96  */
97  InputBase::rbItem_t rbItem(input.getNextFillable());
98  if (rbItem == input.ringbuffer().end())
99  continue;
101  /** fill the cassevent object with the contents from the file */
102  uint64_t datasize = convert(_highTagNbr,*_iter,*rbItem->element);
104  /** in case nothing was retieved, issue a warning. Increase the counter
105  * otherwise
106  */
107  if (!datasize)
108  {
109  Log::add(Log::WARNING,"TagListProcessor: Event with id '"+
110  toString(rbItem->element->id()) + "' is bad: skipping Event");
111  ++_skippedeventscounter;
112  }
113  else
114  ++_counter;
116  /** let the ratemeter know how much we retrieved and return the event
117  * to the ringbuffer
118  */
119  input.newEventAdded(datasize);
120  input.ringbuffer().doneFilling(rbItem, datasize);
121  }
122  }
124  /** retrieve the progess within the list
125  *
126  * @return the current progress
127  */
128  double progress()
129  {
130  const double currsize(distance(_liststart,_iter));
131  const double fullsize(distance(_liststart,_listend));
132  return currsize/fullsize;
133  }
135  /** retrieve the number of events processed by this thread
136  *
137  * @return the number of processed events
138  */
139  uint64_t nEventsProcessed() {return _counter;}
141  /** retrieve the number of events skipped by this thread
142  *
143  * @return the number of skipped events
144  */
145  uint64_t nEventsSkipped() {return _skippedeventscounter;}
147 private:
148  /** iterator to the start of the list */
149  vector<int>::const_iterator _liststart;
151  /** iterator to the end of the list */
152  vector<int>::const_iterator _listend;
154  /** iterator to the current item being processed */
155  vector<int>::const_iterator _iter;
158  /** the run number for the experiment */
159  int _runNbr;
161  /** the beamline number for the experiment */
162  int _blNbr;
164  /** the first part of the tag (that doesn't change) */
167  /** a counter to count how many events (tags) have been processed */
168  uint64_t _counter;
170  /** a counter to count how many events (tags) have been skipped */
172 };
174 /** retrieve the list of tags and the associated high tag number
175  *
176  * @return false in case of an error, true otherwise
177  * @param[out] taglist the taglist for the run and beamline
178  * @param[out] highTagNbr the high tag number for the run at beamline
179  * @param[in] blNbr the beamline number where the run was taken
180  * @param[in] runNbr the run number for which the tags should be returned
181  *
182  * @author Lutz Foucar
183  */
184 bool getCompleteTagList(vector<int> &taglist, int &highTagNbr, int blNbr, int runNbr)
185 {
186  /** get the lowest and highest tag number for the run */
187  int funcstatus,startTagNbr,endTagNbr = 0;
188  funcstatus = sy_read_start_tagnumber(&highTagNbr,&startTagNbr,blNbr,runNbr);
189  if (funcstatus)
190  {
191  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve start tag of run '" +
192  toString(runNbr) + "' at beamline '" + toString(blNbr) +
193  "' Errorcode is '" + toString(funcstatus) + "'");
194  return false;
195  }
196  funcstatus = sy_read_end_tagnumber(&highTagNbr,&endTagNbr,blNbr,runNbr);
197  if (funcstatus)
198  {
199  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve end tag of run '" +
200  toString(runNbr) + "' at beamline '" + toString(blNbr) +
201  "' Errorcode is '" + toString(funcstatus) + "'");
202  return false;
203  }
205  /** get the tag list
206  *
207  * @Note one has to create a SALCA strucht that allows to retrieve arrays
208  * One has to kind of allocate and destroy these structs, which is
209  * completely not exception safe...
210  */
211  Log::add(Log::VERBOSEINFO,"getCompleteTagList: get Taglist for tags between '" +
212  toString(startTagNbr) + "' and '" + toString(endTagNbr) +
213  "' with highTag '" + toString(highTagNbr)+ "' for run '" +
214  toString(runNbr) + "' at beamline '" + toString(blNbr) + "'");
215  struct da_int_array *tagListBuffer=NULL;
216  funcstatus = da_alloc_int_array(&tagListBuffer,0,NULL);
217  if (funcstatus)
218  {
219  Log::add(Log::ERROR,string("getCompleteTagList: could not allocate the ") +
220  "sacla int array struct. Errorcode is '" +
221  toString(funcstatus) + "'");
222  da_destroy_int_array(&tagListBuffer);
223  return false;
224  }
225  funcstatus = sy_read_taglist(tagListBuffer,blNbr,highTagNbr,startTagNbr,endTagNbr);
226  if (funcstatus)
227  {
228  Log::add(Log::ERROR,"getCompleteTagList: could not retrieve taglist of run '" +
229  toString(runNbr) + "' at beamline '" + toString(blNbr) +
230  "'. Errorcode is '" + toString(funcstatus) + "'");
231  da_destroy_int_array(&tagListBuffer);
232  return false;
233  }
234  /** get size of taglist */
235  int tagListSize = -1;
236  funcstatus = da_getsize_int_array(&tagListSize, tagListBuffer);
237  if (funcstatus)
238  {
239  Log::add(Log::ERROR,string("getCompleteTagList: could not get the size of ") +
240  "sacla int array struct. Errorcode is '" +
241  toString(funcstatus) + "'");
242  da_destroy_int_array(&tagListBuffer);
243  return false;
244  }
245  /** copy the data to the vector */
246  taglist.clear();
247  for (size_t i(0); i < static_cast<size_t>(tagListSize); ++i)
248  {
249  int buffer(0);
250  da_getint_int_array(&buffer,tagListBuffer,i);
251  taglist.push_back(buffer);
252  }
253  /** dealloc the array */
254  funcstatus = da_destroy_int_array(&tagListBuffer);
255  if (funcstatus)
256  {
257  Log::add(Log::ERROR,string("getCompleteTagList: error destroying the ") +
258  "sacla int array struct. Errorcode is '" + toString(funcstatus) +
259  "'");
260  return false;
261  }
263  return true;
264 }
265 }//end namespace cass
267 void SACLAOfflineInput::instance(string runlistname,
268  RingBuffer<CASSEvent> &ringbuffer,
269  Ratemeter &ratemeter, Ratemeter &loadmeter,
270  bool quitWhenDone,
271  QObject *parent)
272 {
273  if(_instance)
274  throw logic_error("SACLAOfflineInput::instance(): The instance of the base class is already initialized");
275  _instance = shared_pointer(new SACLAOfflineInput(runlistname,ringbuffer,ratemeter,loadmeter,quitWhenDone,parent));
276 }
278 SACLAOfflineInput::SACLAOfflineInput(string runlistname,
279  RingBuffer<CASSEvent> &ringbuffer,
280  Ratemeter &ratemeter, Ratemeter &loadmeter,
281  bool quitWhenDone,
282  QObject *parent)
283  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
284  _quitWhenDone(quitWhenDone),
285  _runlistname(runlistname)
286 {
287  Log::add(Log::VERBOSEINFO, "SACLAOfflineInput::SACLAOFflineInput: constructed");
288  load();
289 }
292 {
293  CASSSettings s;
294  s.beginGroup("SACLAOfflineInput");
295  _chunks = s.value("NbrThreads",1).toInt();
296 }
299 {
301  Tokenizer tokenize;
303  /** get a list of all runs to process */
304  Log::add(Log::VERBOSEINFO,"SACLAOfflineInput::run(): try to open filelist '" +
305  _runlistname + "'");
306  ifstream runlistfile(_runlistname.c_str());
307  if (!runlistfile.is_open())
308  throw invalid_argument("SACLAOfflineInput::run(): filelist '" + _runlistname +
309  "' could not be opened");
310  vector<string> runlist(tokenize(runlistfile));
311  runlistfile.close();
313  /** add an eventcounter */
314  uint64_t eventcounter(0);
316  /** iterate through the list of runs */
317  vector<string>::const_iterator runlistIt(runlist.begin());
318  vector<string>::const_iterator runlistEnd(runlist.end());
319  while ((!shouldQuit()) && (runlistIt != runlistEnd))
320  {
321  /** split the runname into the run and beamline combination */
322  string runname(*runlistIt++);
323  stringstream ss(runname);
324  string str;
325  vector<int> nbrs;
326  while(getline(ss,str,','))
327  {
328  stringstream ssvalue(str);
329  int value;
330  ssvalue >> value;
331  nbrs.push_back(value);
332  }
333  if (nbrs.size() < 2)
334  {
335  Log::add(Log::ERROR,"SACLAOfflineInput: Could not split information '" +
336  runname + "' into a beamline number and runname");
337  continue;
338  }
339  int blNbr(nbrs[0]);
340  int runNbr(nbrs[1]);
342  /** check if the runstatus is set to 'run ended' and thus the
343  * data is available to read */
344  int runstatus(-1);
345  int funcstatus(0);
346  funcstatus = sy_read_runstatus(&runstatus,blNbr,runNbr);
347  if (funcstatus)
348  {
349  Log::add(Log::ERROR,string("SACLAOfflineInput: could not retrieve ") +
350  "run status of run '" + toString(runNbr) + "' at beamline '" +
351  toString(blNbr) + "'. Errorcode is '" + toString(funcstatus) +
352  "'");
353  continue;
354  }
355  if (runstatus != 0)
356  {
357  Log::add(Log::ERROR,"SACLAOfflineInput: run '" + toString(runNbr) +
358  "' at beamline '" + toString(blNbr) + "' has not finised yet");
359  continue;
360  }
362  /** the rest of the line could be a separated list of tags,
363  * add them to the id list
364  */
365  vector<int> taglist(nbrs.begin()+2,nbrs.end());
366  int highTagNbr(0);
368  /** if the user did not provide a tag list, get the tag list from the API
369  * If there was an error, then continue with the next run in the runlist
370  */
371  if (taglist.empty())
372  {
373  if (!getCompleteTagList(taglist,highTagNbr,blNbr,runNbr))
374  continue;
375  }
376  /** otherwise check if the provided tags are part of the provided run */
377  else
378  {
379  vector<int> completeTagList;
380  if (!getCompleteTagList(completeTagList,highTagNbr,blNbr,runNbr))
381  continue;
382  vector<int>::const_iterator tagIter(taglist.begin());
383  vector<int>::const_iterator taglistEnd(taglist.end());
384  bool didnotfind(false);
385  for (;tagIter != taglistEnd; ++tagIter)
386  {
387  if (find(completeTagList.begin(),completeTagList.end(),*tagIter) == completeTagList.end())
388  {
389  didnotfind = true;
390  break;
391  }
392  }
393  if (didnotfind)
394  {
395  Log::add(Log::ERROR,"SACLAOfflineInput: the provided tag '" +
396  toString(*tagIter) + "' is not part of run '" + toString(runNbr) +
397  "' at beamline '" + toString(blNbr) + "'");
398  continue;
399  }
400  }
402  /** split the taglist into the user requested amount of chunks
403  * and process the splittet tag list in separate threads
404  */
405  int chunksize = taglist.size() / _chunks;
406  for (int chunk(0); chunk < _chunks-1; ++chunk)
407  {
408  vector<int>::const_iterator chunkstart(taglist.begin() + (chunk*chunksize));
409  vector<int>::const_iterator chunkend(taglist.begin() + (chunk+1)*chunksize);
410  /** generate a processor for the chunk */
412  processor(new TagListProcessor(chunkstart,chunkend,runNbr,blNbr,highTagNbr));
413  /** start the processor */
414  processor->start();
415  /** put the processor in the processor container */
416  _procs.push_back(processor);
417  }
418  /** if there are tags in the list remaining, add the into the last processor */
419  vector<int>::const_iterator chunkstart(taglist.begin() + ((_chunks-1)*chunksize));
421  processor(new TagListProcessor(chunkstart,taglist.end(),runNbr,blNbr,highTagNbr));
422  /** start the processor */
423  processor->start();
424  /** put the processor in the processor container */
425  _procs.push_back(processor);
427  /** wait until all threads are finished and sum up the total events */
428  proc_t::iterator processorsIt(_procs.begin());
429  proc_t::iterator processorsEnd(_procs.end());
430  for (; processorsIt != processorsEnd; ++processorsIt)
431  {
432  (*processorsIt)->wait();
433  (*processorsIt)->rethrowException();
434  eventcounter += (*processorsIt)->nEventsProcessed();
435  }
436  }
438  Log::add(Log::INFO,"SACLAOfflineInput::run(): Finished with all runs.");
439  /** in case the input should not quit when everything has been processed, wait
440  * until the input thread is told to quit
441  */
442  if(!_quitWhenDone)
443  while(!shouldQuit())
444  this->sleep(1);
445  Log::add(Log::VERBOSEINFO, "SACLAOfflineInput::run(): closing the input");
446  Log::add(Log::INFO,"SACLAOfflineInput::run(): Analysed '" + toString(eventcounter) +
447  "' events.");
448 }
451 {
452  double progressSum(0.);
453  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
454  progressSum += (*it)->progress();
455  return (progressSum / _procs.size());
456 }
459 {
460  uint64_t counter(0);
461  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
462  counter += (*it)->nEventsProcessed();
463  return counter;
464 }
467 {
468  uint64_t counter(0);
469  for (proc_t::const_iterator it(_procs.begin()); it != _procs.end(); ++it)
470  counter += (*it)->nEventsSkipped();
471  return counter;
472 }
