CFEL - ASG Software Suite  2.5.0
CASS
file_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2009-2016 Lutz Foucar
2 
3 /**
4  * @file file_input.cpp file contains definition of xtcfile input
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <iostream>
10 #include <iomanip>
11 #include <fstream>
12 #include <string>
13 #include <sstream>
14 #include <stdexcept>
15 
16 #include <QtCore/QFileInfo>
17 
18 #include "file_input.h"
19 
20 #include "cass_event.h"
21 #include "cass_settings.h"
22 #include "log.h"
23 
24 using namespace std;
25 using namespace cass;
26 
27 namespace cass
28 {
29 /** process a file
30  *
31  * @author Lutz Foucar
32  */
34 {
35 public:
36  /** define the shared pointer of this */
37  typedef std::tr1::shared_ptr<FileProcessor> shared_pointer;
38 
39  /** constructor
40  *
41  * set the filename and initializes all parameters for the thread to be
42  * able to process the data
43  *
44  * @param filename The name of the file to process
45  */
46  FileProcessor(const string &filename)
47  : _filename(filename),
48  _file(filename.c_str(), ios::binary | ios::in),
49  _counter(0),
50  _skippedcounter(0)
51  {
52  /** load the right reader for the file type depending on its extension */
53  _read = FileReader::instance(_filename);
54  _read->loadSettings();
55  Log::add(Log::INFO,"FileProcessor(): processing file '" + _filename +
56  "' with file reader type '" + _read->type() + "'");
57  _file.seekg (0, ios::end);
58  _filesize = _file.tellg();
59  _file.seekg (0, ios::beg);
60  _read->readHeaderInfo(_file);
61  }
62 
63  /** process the file */
64  void runthis()
65  {
66  /** get a pointer to the calling thread */
67  InputBase::shared_pointer::element_type& input(InputBase::reference());
68 
69  /** make a container with all event ids */
70  vector<CASSEvent::id_t> ids;
71 
72  /** iterate through the file until we've reached the filesize */
73  while((!input.shouldQuit()) && (_file.tellg() < _filesize))
74  {
75  /** retrieve a new element from the ringbuffer */
76  InputBase::rbItem_t rbItem(input.getNextFillable());
77  if (rbItem == input.ringbuffer().end())
78  continue;
79 
80  /** fill the cassevent object with the contents from the file */
81  bool isGood((*_read)(_file,*rbItem->element));
82  if (!isGood)
83  {
84  ++_skippedcounter;
85  Log::add(Log::WARNING,"FileProcessor::run(): Event with id '"+
86  toString(rbItem->element->id()) + "' is bad: skipping Event");
87  }
88  else
89  {
90  /** check if id is unique, if not skip event */
91  CASSEvent::id_t id(rbItem->element->id());
92  if (find(ids.begin(), ids.end(), id) != ids.end())
93  {
94  string output("File '"+_filename+"' has duplicate id '" +
95  toString(id) + "'");
96  if (_read->type() == "xtc")
97  {
98  uint32_t seconds(static_cast<uint32_t>((id & 0xFFFFFFFF00000000) >> 32));
99  uint32_t fiducial(static_cast<uint32_t>((id & 0x00000000FFFFFFFF) >> 8));
100  output += ("(seconds '" + toString(seconds) + "', fiducial '" +
101  toString(fiducial) + "')");
102  }
103  Log::add(Log::ERROR,output);
104  isGood = false;
105  ++_skippedcounter;
106  }
107  else
108  {
109  /** @todo count the number of skipped events and make is accessible to
110  * rateplotter
111  */
112  ++_counter;
113  ids.push_back(rbItem->element->id());
114  }
115  }
116  /** give item back to the ringbuffer */
117  rbItem->element->setFilename(_filename.c_str());
118  input.newEventAdded(rbItem->element->datagrambuffer().size());
119  input.ringbuffer().doneFilling(rbItem, isGood);
120  }
121  _file.close();
122  }
123 
124  /** retrieve the progess within the file
125  *
126  * @return the current progress
127  */
128  double progress()
129  {
130  return ((_file.tellg() == -1) ? 1. :
131  (static_cast<double>(_file.tellg()) /
132  static_cast<double>(_filesize)));
133  }
134 
135  /** retrieve the number of events processed by this thread
136  *
137  * @return the number of processed events
138  */
139  uint64_t nEventsProcessed() {return _counter;}
140 
141  /** retrieve the number of skipped events by this thread
142  *
143  * @return the number of skippted events
144  */
145  uint64_t nSkippedEvents() {return _skippedcounter;}
146 
147 private:
148  /** the filename to work on */
149  string _filename;
150 
151  /** shared pointer to the actual reader */
153 
154  /** the file stream */
155  ifstream _file;
156 
157  /** the size of the file */
158  streampos _filesize;
159 
160  /** a counter for the events */
161  uint64_t _counter;
162 
163  /** a counter for the skipped events */
164  uint64_t _skippedcounter;
165 };//end class FileProcessor
166 
167 }//end namespace cass
168 
169 void FileInput::instance(string filelistname,
170  RingBuffer<CASSEvent> &ringbuffer,
171  Ratemeter &ratemeter, Ratemeter &loadmeter,
172  bool quitWhenDone,
173  QObject *parent)
174 {
175  if(_instance)
176  throw logic_error("FileInput::instance(): The instance of the base class is already initialized");
177  _instance = shared_pointer(new FileInput(filelistname,ringbuffer,ratemeter,loadmeter,quitWhenDone,parent));
178 }
179 
180 FileInput::FileInput(string filelistname,
181  RingBuffer<CASSEvent> &ringbuffer,
182  Ratemeter &ratemeter, Ratemeter &loadmeter,
183  bool quitWhenDone,
184  QObject *parent)
185  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
186  _quitWhenDone(quitWhenDone),
187  _filelistname(filelistname)
188 {
189  Log::add(Log::VERBOSEINFO, "FileInput::FileInput: constructed");
190  load();
191 }
192 
194 {
195  CASSSettings s;
196  s.beginGroup("FileInput");
197  _parallelize = s.value("Parallelize",false).toBool();
198 }
199 
201 {
203  Tokenizer tokenize;
204 
205  /** retrieve all files in a list from the file */
206  Log::add(Log::VERBOSEINFO,"FileInput::run(): try to open filelist '" +
207  _filelistname + "'");
208  ifstream filelistfile(_filelistname.c_str());
209  if (!filelistfile.is_open())
210  throw invalid_argument("FileInput::run(): filelist '" + _filelistname +
211  "' could not be opened");
212  vector<string> filelist(tokenize(filelistfile));
213  filelistfile.close();
214 
215  /** go through the list of files and create a processor for each file and
216  * add them to the list of processors
217  */
218  vector<string>::const_iterator filelistIt(filelist.begin());
219  vector<string>::const_iterator filelistEnd(filelist.end());
220  for (;filelistIt != filelistEnd; ++filelistIt)
221  {
222  string filename(*filelistIt);
224  /** if there was such a file then we want to load it */
225  if (info.exists())
226  {
227  FileProcessor::shared_pointer fProc(new FileProcessor(filename));
228  _fProcs.push_back(fProc);
229  }
230  else
231  Log::add(Log::ERROR,"FileInput::run(): could not open '" + filename + "'");
232  }
233  /** process the files using the processors
234  *
235  * @note we don't need to check for quitting at this point and the loop below
236  * because the threads themselves will check if the input should quit
237  * and if it does they will shut down graciously, thus wait will only
238  * wait until all the threads are finished, no need to shortcut by not
239  * starting the threads. Also because using the shortcuts in these
240  * loops will prevent gathering the correct information about how many
241  * events have been analyzed so far.
242  */
243  fileProcessors_t::iterator pIt(_fProcs.begin());
244  fileProcessors_t::iterator pEnd(_fProcs.end());
245  for (;(!shouldQuit()) && (pIt != pEnd); ++pIt)
246  {
247  (*pIt)->start();
248  if (!_parallelize)
249  (*pIt)->wait();
250  }
251 
252  /** wait until the processors are done and gather information about the
253  * number of events they processed.
254  */
255  uint64_t eventcounter(0);
256  pIt = _fProcs.begin();
257  for (; pIt != pEnd; ++pIt)
258  {
259  (*pIt)->wait();
260  (*pIt)->rethrowException();
261  eventcounter += (*pIt)->nEventsProcessed();
262  }
263 
264  Log::add(Log::INFO,"FileInput::run(): Finished with all files.");
265  if(!_quitWhenDone)
266  while(!shouldQuit())
267  this->sleep(1);
268  Log::add(Log::VERBOSEINFO, "FileInput::run(): closing the input");
269  Log::add(Log::INFO,"FileInput::run(): Analysed '" + toString(eventcounter) +
270  "' events.");
271 }
272 
274 {
275  double progressSum(0.);
276  for (fileProcessors_t::const_iterator it(_fProcs.begin()); it != _fProcs.end(); ++it)
277  progressSum += (*it)->progress();
278  return (progressSum / _fProcs.size());
279 }
280 
282 {
283  uint64_t counter(0);
284  for (fileProcessors_t::const_iterator it(_fProcs.begin()); it != _fProcs.end(); ++it)
285  counter += (*it)->nEventsProcessed();
286  return counter;
287 }
288 
290 {
291  uint64_t counter(0);
292  for (fileProcessors_t::const_iterator it(_fProcs.begin()); it != _fProcs.end(); ++it)
293  counter += (*it)->nSkippedEvents();
294  return counter;
295 }
bool _parallelize
flag that tells the input should analyze the files in parallel
Definition: file_input.h:106
uint64_t eventcounter()
retrieve the number of processed events
Definition: file_input.cpp:281
std::string _filelistname
name of the file containing all files that we need to process
Definition: file_input.h:112
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
sleep(unsigned long secs)
void runthis()
process the file
Definition: file_input.cpp:64
status_t _status
the internal status of the thread
std::tr1::shared_ptr< FileProcessor > shared_pointer
define the shared pointer of this
Definition: file_input.cpp:37
uint64_t nSkippedEvents()
retrieve the number of skipped events by this thread
Definition: file_input.cpp:145
file contains declaration of the CASSEvent
Settings for CASS.
Definition: cass_settings.h:30
uint64_t skippedeventcounter()
retrieve the number of skipped processed events
Definition: file_input.cpp:289
uint64_t id_t
define the id type
Definition: cass_event.h:52
Input base class.
Definition: input_base.h:31
bool _quitWhenDone
flag to tell the thread to quit when its done with all files
Definition: file_input.h:109
STL namespace.
uint64_t nEventsProcessed()
retrieve the number of events processed by this thread
Definition: file_input.cpp:139
double progress()
retrieve the averaged progress state of all file processors
Definition: file_input.cpp:273
additional info
streampos _filesize
the size of the file
Definition: file_input.cpp:158
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
ifstream _file
the file stream
Definition: file_input.cpp:155
fromStdString(const std::string &str)
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
FileReader::shared_pointer _read
shared pointer to the actual reader
Definition: file_input.cpp:152
bool shouldQuit() const
query whether this thread is told to quit
tokenize to return all lines of an ascii file in a vector
Definition: cass.h:111
fileProcessors_t _fProcs
the file processor container
Definition: file_input.h:118
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
uint64_t _skippedcounter
a counter for the skipped events
Definition: file_input.cpp:164
file contains declaration of xtcfile input
value(const QString &key, const QVariant &defaultValue=QVariant()
uint64_t _counter
a counter for the events
Definition: file_input.cpp:161
double progress()
retrieve the progess within the file
Definition: file_input.cpp:128
FileProcessor(const string &filename)
constructor
Definition: file_input.cpp:46
File Input for cass.
Definition: file_input.h:41
void load()
load the parameters used for the multifile input
Definition: file_input.cpp:193
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
string _filename
the filename to work on
Definition: file_input.cpp:149
Example of how to use the sacla online input
Definition: SACLA-online.ini:2
std::tr1::shared_ptr< FileReader > shared_pointer
typedef the shared pointer of this
Definition: file_reader.h:28
contains a logger for cass
void runthis()
function with the main loop
Definition: file_input.cpp:200
beginGroup(const QString &prefix)
process a file
Definition: file_input.cpp:33
A QThread that has the ability to be paused and resumed.