CFEL - ASG Software Suite  2.5.0
CASS
multifile_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2011, 2013 Lutz Foucar
2 
3 /**
4  * @file multifile_input.cpp file contains definition of file input reading
5  * multiple files in parallel.
6  *
7  * @author Lutz Foucar
8  */
9 
10 #include <iostream>
11 #include <iomanip>
12 #include <fstream>
13 #include <sstream>
14 #include <stdexcept>
15 #include <tr1/functional>
16 
17 #include <QStringList>
18 #include <QtCore/QFileInfo>
19 
20 #include "multifile_input.h"
21 
22 #include "cass_event.h"
23 #include "cass_settings.h"
24 #include "file_parser.h"
25 #include "log.h"
26 
27 using namespace std;
28 using namespace cass;
29 
30 namespace cass
31 {
32 }//end namespace cass
33 
34 void MultiFileInput::instance(const string& filelistname,
35  RingBuffer<CASSEvent> &ringbuffer,
36  Ratemeter &ratemeter, Ratemeter &loadmeter,
37  bool quitWhenDone,
38  QObject *parent)
39 {
40  if(_instance)
41  throw logic_error("MultiFileInput::instance(): The instance of the base class is already initialized");
42  _instance = shared_pointer(new MultiFileInput(filelistname,ringbuffer,ratemeter,loadmeter,quitWhenDone,parent));
43 }
44 
45 MultiFileInput::MultiFileInput(const string& filelistname,
46  RingBuffer<CASSEvent> &ringbuffer,
47  Ratemeter &ratemeter, Ratemeter &loadmeter,
48  bool quitWhenDone,
49  QObject *parent)
50  :InputBase(ringbuffer,ratemeter,loadmeter,parent),
51  _quitWhenDone(quitWhenDone),
52  _filelistname(filelistname),
53  _rewind(false)
54 {
55  load();
56 }
57 
59 {}
60 
62 {
63  CASSSettings s;
64  s.beginGroup("MultiFileInput");
65  _rewind = s.value("Rewind",false).toBool();
66  _new = s.value("UseNewContainer",false).toBool()? "_new" :"";
67  _nbrDifferentSources = s.value("NbrDifferentSources",2).toUInt();
68 }
69 
70 void MultiFileInput::readEventData(event2positionreaders_t::iterator &eventIt)
71 {
72  rbItem_t rbItem(getNextFillable());
73  if (rbItem == _ringbuffer.end())
74  return;
75  rbItem->element->id() = eventIt->first;
76  bool isGood(true);
77  positionreaders_t &posreaders(eventIt->second);
78  positionreaders_t::iterator fileposread(posreaders.begin());
79  positionreaders_t::const_iterator posreadEnd(posreaders.end());
80  for (; (!shouldQuit()) && (fileposread != posreadEnd); ++fileposread)
81  {
82  FilePointer &filepointer(fileposread->second);
83  FileReader &read(*(fileposread->first));
84  ifstream &filestream(filepointer.getStream());
85  isGood = read(filestream,*rbItem->element) && isGood;
86  }
87  _ringbuffer.doneFilling(rbItem, isGood);
88  newEventAdded(rbItem->element->datagrambuffer().size());
89 }
90 
92 {
94 
95  /** create the resource and a lock for it that contains the pointers to the
96  * places in the files where one finds the data that corresponds to a given
97  * eventID. And create a container for all the file parser that we create in
98  * the next step. Create helpers to split the extension from the filename
99  * and to tokenize the list of filenames.
100  */
101  Tokenizer tokenize;
102  event2positionreaders_t event2posreaders;
104  vector<FileParser::shared_pointer> parsercontainer;
105 
106  /** open the file containing the files to process, convert the contents to a
107  * vector of filenames. Iterate through the vector of filenames and for each
108  * filename create a file parser that will parse this file. Then put the
109  * fileparser in the container. Also create a reader for the file and put it
110  * in the pair that contains the filepointer and the reader for it. Then read
111  * the header information into the file reader. Rewind the file to the
112  * beginning and
113  */
114  ifstream filelistfile(_filelistname.c_str());
115  if (!filelistfile.is_open())
116  throw invalid_argument("MultiFileInput::run(): filelist '"+_filelistname +
117  "' could not be opened");
118 
119  vector<string> filelist(tokenize(filelistfile));
120  vector<string>::const_iterator filelistIt(filelist.begin());
121  vector<string>::const_iterator filelistEnd(filelist.end());
122  while ((!shouldQuit()) && (filelistIt != filelistEnd))
123  {
124  string filename(*filelistIt++);
126  Log::add(Log::INFO,"MultiFileInput::run(): parsing file '" + filename + "'");
127  if (info.exists())
128  {
129  FilePointer fp;
130  fp._filestream =
131  FilePointer::filestream_t(new ifstream(filename.c_str(), std::ios::binary | std::ios::in));
132  fp._pos = fp._filestream->tellg();
133  filereaderpointerpair_t readerpointer
134  (make_pair(FileReader::instance(filename + _new),fp));
135  readerpointer.first->loadSettings();
136  readerpointer.first->readHeaderInfo(*fp._filestream);
137  fp._filestream->seekg(0,ios::beg);
138  fp._pos = fp._filestream->tellg();
139  FileParser::shared_pointer fileparser
140  (FileParser::instance(info.suffix().toStdString(),
141  readerpointer,event2posreaders,lock));
142  fileparser->start();
143  parsercontainer.push_back(fileparser);
144  }
145  else
146  Log::add(Log::ERROR,"MultiFileInput::run(): could not open File '" +
147  filename + "'");
148  }
149 
150  /** wait until all files are parsed */
151  vector<FileParser::shared_pointer>::iterator fileparseIt(parsercontainer.begin());
152  vector<FileParser::shared_pointer>::const_iterator fileparseEnd(parsercontainer.end());
153  for (;fileparseIt!=fileparseEnd;++fileparseIt)
154  {
155  (*fileparseIt)->wait();
156  (*fileparseIt)->rethrowException();
157  }
158 
159  /** Then iterate through the eventlist, read the contents of each file and
160  * put it into the cassvent. For each entry in the eventlist, check whether
161  * all requested infos are present.
162  * If the data for this eventid is complete, retrieve a CASSEvent from the
163  * ringbuffer and use the file readers to retrieve the data from the file
164  * and convert them into the CASSEvent.
165  */
166  event2positionreaders_t::iterator eventIt(event2posreaders.begin());
167  event2positionreaders_t::const_iterator eventEnd(event2posreaders.end());
168  while ((!shouldQuit()) && (eventIt != eventEnd))
169  {
170  pausePoint();
171  if (_rewind)
172  {
173  eventIt = event2posreaders.begin();
174  continue;
175  }
176 
177  /** check whether the event contains information from all files */
178  if (eventIt->second.size() != _nbrDifferentSources)
179  {
180  Log::add(Log::WARNING,"MultiFileInput:run(): Event '" +
181  toString(eventIt->first) + "' is incomplete; skipping event.");
182  ++eventIt;
183  continue;
184  }
185  readEventData(eventIt);
186  ++eventIt;
187  }
188 
189  Log::add(Log::INFO,"MultiFileInput::run(): Finished with all files.");
190  if(!_quitWhenDone)
191  while(_control != _quit)
192  this->sleep(1);
193  Log::add(Log::INFO,"MultiFileInput::run(): closing the input");
194 }
195 
filestream_t _filestream
the stream to the file
Definition: cass.h:93
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
sleep(unsigned long secs)
status_t _status
the internal status of the thread
file contains declaration of the CASSEvent
static shared_pointer instance(const std::string type, const filereaderpointerpair_t readerpointerpair, event2positionreaders_t &event2posreader, QReadWriteLock &lock)
create an instance of the requested type
Definition: file_parser.cpp:42
A resource that will point at a specific location within a file.
Definition: cass.h:84
Settings for CASS.
Definition: cass_settings.h:30
contains base class for all file parsers
Input base class.
Definition: input_base.h:31
STL namespace.
base class for all file readers
Definition: file_reader.h:24
std::ifstream & getStream()
Definition: cass.h:96
additional info
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
fromStdString(const std::string &str)
std::tr1::shared_ptr< FileParser > shared_pointer
typedef the shared pointer of this
Definition: file_parser.h:45
std::vector< filereaderpointerpair_t > positionreaders_t
map file name to the filepointer
Definition: cass.h:280
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
void read(const std::string &filename, std::string &headerstring, std::vector< float > &matrix, std::pair< int, int > &shape)
read the cbf data into a linearized 2d array
Definition: cbf_handle.hpp:191
~MultiFileInput()
destructor
void runthis()
function with the main loop
Multi File Input for cass.
bool shouldQuit() const
query whether this thread is told to quit
bool _rewind
flag that tells the input to rewind to the beginning of the eventlist
file contains declaration of file input reading multiple files in parallel.
std::streampos _pos
the position with the file
Definition: cass.h:90
control_t _control
the internal control status of the thread
tokenize to return all lines of an ascii file in a vector
Definition: cass.h:111
size_t _nbrDifferentSources
the number of different sources
void load()
load the parameters used for the multifile input
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
std::pair< std::tr1::shared_ptr< FileReader >, FilePointer > filereaderpointerpair_t
pair of a file pointer with the associated file reader
Definition: cass.h:276
std::tr1::shared_ptr< std::ifstream > filestream_t
defining a shared pointer to the stream
Definition: cass.h:87
std::string _new
string to identify whether one want to use the new container
value(const QString &key, const QVariant &defaultValue=QVariant()
RingBuffer< CASSEvent > & _ringbuffer
reference to the ringbuffer
Definition: input_base.h:140
bool _quitWhenDone
flag to tell the thread to quit when its done with all files
std::string _filelistname
the name of the file that contains the list of file to process
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
std::map< uint64_t, positionreaders_t > event2positionreaders_t
the list of events contained in a file with the associated position and reader
Definition: cass.h:282
QMutex lock
a mutex so that external program can lock access to this
Definition: input_base.h:106
static shared_pointer instance(const std::string &filename)
create an instance of the requested type
Definition: file_reader.cpp:29
void pausePoint()
point where the thread will be paused
rbItem_t getNextFillable(unsigned timeout=500)
retrieve an iterator to the next fillable event
Definition: input_base.cpp:48
void readEventData(event2positionreaders_t::iterator &eventIt)
read the information from the different files into the cassevent
contains a logger for cass
void newEventAdded(const size_t eventsize)
increment the numer of events received in the ratemeter
Definition: input_base.cpp:37
beginGroup(const QString &prefix)