CFEL - ASG Software Suite  2.5.0
CASS
hdf5_converter.cpp
Go to the documentation of this file.
1 //Copyright (C) 2010-2015 Lutz Foucar
2 
3 /**
4  * @file hdf5_converter.cpp definition of pp1001 (hdf5_converter)
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <QtCore/QDateTime>
10 
11 #include <hdf5.h>
12 #include <stdint.h>
13 #include <sstream>
14 #include <iomanip>
15 #include <tr1/functional>
16 
17 #include "hdf5_converter.h"
18 #include "result.hpp"
19 #include "cass_settings.h"
20 #include "log.h"
21 #include "convenience_functions.h"
22 #include "hdf5_handle.hpp"
23 #include "cass_version.h"
24 
25 using namespace cass;
26 using namespace std;
27 using tr1::bind;
28 using tr1::function;
29 using tr1::placeholders::_1;
30 using tr1::placeholders::_2;
31 
32 namespace cass
33 {
34 
35 namespace hdf5
36 {
37 
38 
39 /** create group name for an event from its ID
40  *
41  * details
42  *
43  * @todo make sure that it will be always converted to the timezone in
44  * stanford otherwise people get confused. Timezones are not
45  * yet supported in QDateTime
46  *
47  * @return id pointing to the groupname created
48  * @param eventid the event id
49  * @param calibcycle the current calibcycle string
50  *
51  * @author Lutz Foucar
52  */
53 hid_t createGroupNameFromEventId(uint64_t eventid, hid_t calibcycle)
54 {
55  uint32_t timet(static_cast<uint32_t>((eventid & 0xFFFFFFFF00000000) >> 32));
56  uint32_t eventFiducial = static_cast<uint32_t>((eventid & 0x00000000FFFFFFFF) >> 8);
57  std::stringstream groupname;
59  time.setTime_t(timet);
60  if (timet)
61  groupname << time.toString(Qt::ISODate).toStdString() <<"_"<<eventFiducial;
62  else
63  groupname << "UnknownTime_"<<eventid;
64  Log::add(Log::VERBOSEINFO,"createGroupNameFromEventId(): creating group: " +
65  groupname.str());
66  return H5Gcreate1(calibcycle, groupname.str().c_str(),0);
67 }
68 
69 
70 /** write an entity to a h5 file
71  *
72  * @author Lutz Foucar
73  */
75 {
76 public:
77  /** constructor
78  *
79  * create the hdf5 file with the name and the handles to the specific data
80  * storage. Add a dataset that describes the cass version with which the file
81  * was generated
82  *
83  *
84  * @param filename the name of the h5 file
85  * @param id the id of the event to get the data for
86  */
87  WriteEntry(const string& filename, const CASSEvent::id_t id=0)
88  : _fh(filename),
89  _id(id),
90  _baseGroupname("/")
91  {
92  _fh.writeString(string("Written with cass version '" + VERSION + "'"),
93  "cass-version");
94  }
95 
96  /** set the base group name
97  *
98  * @param name the new base group name
99  */
100  void setBaseGroup(const string &name)
101  {
102  _baseGroupname = name;
103  }
104 
105  /** set the event id
106  *
107  * @param id the current event id
108  */
109  virtual void setEventID(const CASSEvent::id_t id)
110  {
111  _id = id;
112  }
113 
114  /** retrieve the current file size
115  *
116  * @return the current file size
117  */
118  size_t currentFileSize() const
119  {
120  return _fh.currentFileSize();
121  }
122 
123  /** write an entry to h5 file using the functions defined above
124  *
125  * @param entry The entry to put into the h5 file
126  */
127  virtual void operator()(const pp1002::entry_t& entry)
128  {
129  const uint32_t &options(entry.options);
130  const string &gname(entry.groupname);
131  const string &name(entry.name);
132  Processor &proc(*entry.proc);
133 
134  /** create the requested dataset name */
135  const string dataName(_baseGroupname + "/" + gname + "/" + name);
136 
137  /** retrieve data from pp and write it to the h5 file */
138  const Processor::result_t &data(proc.result(_id));
139  QReadLocker lock(&data.lock);
140  switch (data.dim())
141  {
142  case 0:
143  {
144  _fh.writeScalar(data.front(),dataName);
145  break;
146  }
147  case 1:
148  {
150  _fh.writeArray(data.storage(), xaxis.nBins, dataName);
151  _fh.writeScalarAttribute(xaxis.low, "xLow", dataName);
152  _fh.writeScalarAttribute(xaxis.up, "xUp", dataName);
153  break;
154  }
155  case 2:
156  {
159  _fh.writeMatrix(data.storage(), data.shape(), dataName, options);
160  _fh.writeScalarAttribute(xaxis.low, "xLow", dataName);
161  _fh.writeScalarAttribute(xaxis.up, "xUp", dataName);
162  _fh.writeScalarAttribute(yaxis.low, "yLow", dataName);
163  _fh.writeScalarAttribute(yaxis.up, "yUp", dataName);
164  break;
165  }
166  default:
167  throw runtime_error("WriteEntry::operator(): data dimension '" +
168  toString(data.dim()) + "' not known");
169  break;
170  }
171  }
172 
173 protected:
174  /** the file handle of the h5 file */
176 
177  /** the eventid to look for */
179 
180  /** the base group name */
182 };
183 
184 
185 /** append an entity to a dataset in h5 file
186  *
187  * @author Lutz Foucar
188  */
189 class AppendEntry : public WriteEntry
190 {
191 public:
192  /** constructor
193  *
194  * create the hdf5 file with the name and the handles to the specific data
195  * storage. Add a dataset that describes the cass version with which the file
196  * was generated
197  *
198  * @param filename the name of the h5 file
199  * @param id the id of the event to get the data for
200  */
201  AppendEntry(const string& filename, const CASSEvent::id_t id=0)
202  : WriteEntry(filename,id)
203  {
204  _writeAttributes = std::tr1::bind(&AppendEntry::writeAttib,this,_1,_2);
205  }
206 
207  /** write an entry to h5 file using the functions defined above
208  *
209  * @param entry The entry to put into the h5 file
210  */
211  virtual void operator()(const pp1002::entry_t& entry)
212  {
213  const uint32_t &options(entry.options);
214  const string &gname(entry.groupname);
215  const string &name(entry.name);
216  Processor &proc(*entry.proc);
217 
218  /** create the requested dataset name */
219  const string dataName(_baseGroupname + "/" + gname + "/" + name);
220 
221  /** retrieve data from pp and write it to the h5 file */
222  const Processor::result_t &data(proc.result(_id));
223  QReadLocker lock(&data.lock);
224  vector<size_t> shape(data.np_shape());
225  ::hdf5::shape_t theShape(shape.begin(),shape.end());
226  _fh.appendData(data.storage(),theShape,dataName,options);
227  _writeAttributes(data,dataName);
228  }
229 
230  /** add the event id to the event id dataset
231  *
232  * in addition switch the attribute writer to nothing
233  *
234  * @param id the new eventid
235  */
236  virtual void setEventID(CASSEvent::id_t id)
237  {
238  if (id)
239  {
240  if (_id)
241  _writeAttributes = std::tr1::bind(&AppendEntry::writeNothing,this,_1,_2);
242  ::hdf5::shape_t shape(1,1);
243  vector<CASSEvent::id_t> evtid(1,_id);
244  _fh.appendData(evtid,shape,"eventIds");
245  }
246  _id = id;
247  }
248 
249 protected:
250  /** write the attributes to the dataset
251  *
252  * @param data reference to the data
253  * @param dsetName the name of the dataset that the attributes should be
254  * written to
255  */
256  void writeAttib(const Processor::result_t &data, const string &dsetName)
257  {
258  switch (data.dim())
259  {
260  case 0:
261  {
262  break;
263  }
264  case 1:
265  {
267  _fh.writeScalarAttribute(xaxis.low, "xLow", dsetName);
268  _fh.writeScalarAttribute(xaxis.up, "xUp", dsetName);
269  break;
270  }
271  case 2:
272  {
275  _fh.writeScalarAttribute(xaxis.low, "xLow", dsetName);
276  _fh.writeScalarAttribute(xaxis.up, "xUp", dsetName);
277  _fh.writeScalarAttribute(yaxis.low, "yLow", dsetName);
278  _fh.writeScalarAttribute(yaxis.up, "yUp", dsetName);
279  break;
280  }
281  default:
282  throw runtime_error("AppendEntry::writeAttrib(): data dimension '" +
283  toString(data.dim()) + "' not known");
284  break;
285  }
286  }
287 
288  /** write nothing
289  *
290  * @param unused unused reference
291  * @param unused not used name
292  */
293  void writeNothing(const Processor::result_t& /* unused */,
294  const string& /* unused */)
295  {
296 
297  }
298 
299 private:
300  /** function to write the results axis attributes just once to the dataset */
301  std::tr1::function <void(const Processor::result_t&,const string&)> _writeAttributes;
302 };
303 
304 
305 }//end namespace hdf5
306 }//end namespace cass
307 
308 
309 
310 
311 
312 
313 
314 //*************** h5 output *************************
315 
316 pp1002::pp1002(const name_t &name)
317  : Processor(name)
318 {
319  loadSettings(0);
320 }
321 
323 {
324  CASSSettings s;
325  s.beginGroup("Processor");
327  setupGeneral();
328 
329  int compresslevel(s.value("CompressLevel",2).toBool());
330  htri_t compavailable (H5Zfilter_avail(H5Z_FILTER_DEFLATE));
331  unsigned int filter_info;
332  H5Zget_filter_info(H5Z_FILTER_DEFLATE, &filter_info);
333  if (!compavailable ||
334  !(filter_info & H5Z_FILTER_CONFIG_ENCODE_ENABLED) ||
335  !(filter_info & H5Z_FILTER_CONFIG_DECODE_ENABLED))
336  throw logic_error("pp1002::loadSettings(): HDF5 library doesn't allow compression. Please use a hdf5 library that allows compression.");
337 
338  bool allDepsAreThere(true);
339  int size = s.beginReadArray("Processor");
340  for (int i = 0; i < size; ++i)
341  {
342  s.setArrayIndex(i);
343  string procname(s.value("Name","Unknown").toString().toStdString());
344  if (procname == "Unknown")
345  continue;
346  shared_pointer proc(setupDependency("",procname));
347  allDepsAreThere = proc && allDepsAreThere;
348  string groupname(s.value("GroupName","/").toString().toStdString());
349  string name = proc ? s.value("ValName",QString::fromStdString(proc->name())).toString().toStdString() : "";
350  _procList.push_back(entry_t(name,groupname,compresslevel,proc));
351  }
352  s.endArray();
353 
354  size = s.beginReadArray("ProcessorSummary");
355  for (int i = 0; i < size; ++i)
356  {
357  s.setArrayIndex(i);
358  string procname(s.value("Name","Unknown").toString().toStdString());
359  if (procname == "Unknown")
360  continue;
361  shared_pointer proc(setupDependency("",procname));
362  allDepsAreThere = proc && allDepsAreThere;
363  string groupname(s.value("GroupName","/").toString().toStdString());
364  string name = proc ? s.value("ValName",QString::fromStdString(proc->name())).toString().toStdString() : "";
365  _procSummaryList.push_back(entry_t(name,groupname,compresslevel,proc));
366  }
367  s.endArray();
368 
369  bool ret (setupCondition());
370  if (!(ret && allDepsAreThere))
371  {
372  _procList.clear();
373  _procSummaryList.clear();
374  return;
375  }
376 
377  _basefilename = s.value("FileBaseName",QString::fromStdString(_basefilename)).toString().toStdString();
378 
379  /** when requested add the first subdir to the filename and make sure that the
380  * directory exists.
381  */
382  _maxFilePerSubDir = s.value("MaximumNbrFilesPerDir",-1).toInt();
383  _filecounter = 0;
384 
385  /** set up the maximum file size and convert to Bytes */
386  _maxFileSize = s.value("MaximumFileSize_GB",200).toUInt();
387  _maxFileSize *= 1024*1024*1024;
388 
389  /** set up which kind of file should be written */
390  /** set up the dir or the filename, depending on the case */
391  bool multipleevents(s.value("WriteMultipleEventsInOneFile",false).toBool());
392  bool singleDataset(s.value("WriteToSingleDatasets",false).toBool());
393  if (multipleevents)
394  {
395  if (singleDataset)
396  {
397  _writeEvent = std::tr1::bind(&pp1002::appendEventToMultipleEventsFile,this,_1);
401  }
402  else
403  {
404  _writeEvent = std::tr1::bind(&pp1002::writeEventToMultipleEventsFile,this,_1);
408  }
409  }
410  else
411  {
412  _writeEvent = std::tr1::bind(&pp1002::writeEventToSingleFile,this,_1);
413  _writeSummary = std::tr1::bind(&pp1002::writeSummaryToSingleFile,this);
414  if(_maxFilePerSubDir != -1)
416  }
417 
418  _hide = true;
419  string output("Processor '" + name() + "' will write histogram ");
420  for (list<entry_t>::const_iterator it(_procList.begin());
421  it != _procList.end(); ++it)
422  output += ("'" + it->proc->name() + "' to Group '" + it->groupname +
423  "' with dataname '" + it->name +"',");
424  output += (" of a hdf5 file with '" + _basefilename +
425  "' as basename. 2D File will" + (compresslevel ? "" : " NOT") +
426  " be compressed. Files will" + (_maxFilePerSubDir != -1 ? "" : " NOT") +
427  " be distributed. Events will"+ (multipleevents ? " NOT" : "") +
428  " be written to single files. In which case the data of the" +
429  " individual processors will" + (singleDataset ? "" : " NOT") +
430  " be put into a single dataset. Maximum file size is '" +
431  toString(_maxFileSize) + "' bytes. Condition is '" +
432  _condition->name() + "'");
433  Log::add(Log::INFO,output);
434 }
435 
437 {
438  throw logic_error("pp1002::result: '"+name()+"' should never be called");
439 }
440 
442 {
443  /** check if something to be written */
444  if (_procSummaryList.empty())
445  return;
446 
447  QMutexLocker locker(&_lock);
448  _writeSummary();
449 }
450 
452 {
453  /** remove subdir from filename when they should be distributed */
454  if (_maxFilePerSubDir != -1)
456 
457  /** create filename from base filename and write entries to file */
458  hdf5::WriteEntry writeEntry(_basefilename + "_Summary.h5");
459 
460  /** write all entries to file using the writer
461  *
462  * @note we can't use for_each here, since we need to ensure that the
463  * entries are written sequentially and for_each can potentially use
464  * omp to parallelize the execution.
465  */
466  list<entry_t>::const_iterator it(_procSummaryList.begin());
467  list<entry_t>::const_iterator last(_procSummaryList.end());
468  while(it != last)
469  writeEntry(*it++);
470 }
471 
473 {
474  _entryWriter->setEventID(0);
475  _entryWriter->setBaseGroup("Summary");
476 
477  /** write all entries to file using the writer
478  *
479  * @note we can't use for_each here, since we need to ensure that the
480  * entries are written sequentially and for_each can potentially use
481  * omp to parallelize the execution.
482  */
483  hdf5::WriteEntry &writeEntry(*_entryWriter);
484  list<entry_t>::const_iterator it(_procSummaryList.begin());
485  list<entry_t>::const_iterator last(_procSummaryList.end());
486  while(it != last)
487  writeEntry(*it++);
488 }
489 
491 {
492  /** check if there is something to be written or if it should be written */
493  if (_procList.empty() || !_condition->result(evt.id()).isTrue())
494  return;
495 
496  QMutexLocker locker(&_lock);
497  _writeEvent(evt);
498 }
499 
501 {
502  /** increment subdir in filename when they should be distributed and the
503  * counter exeeded the maximum amount of files per subdir
504  */
506  {
507  _filecounter = 0;
509  }
510  ++_filecounter;
511 
512  /** create entry writer with filename using basefilename and event id */
513  hdf5::WriteEntry writeEntry(_basefilename + "_" + toString(evt.id()) + ".h5",evt.id());
514 
515  /** write all entries to file using the writer
516  *
517  * @note we can't use for_each here, since we need to ensure that the
518  * entries are written sequentially and for_each can potentially use
519  * omp to parallelize the execution.
520  */
521  list<entry_t>::const_iterator it(_procList.begin());
522  list<entry_t>::const_iterator last(_procList.end());
523  while(it != last)
524  writeEntry(*it++);
525 }
526 
528 {
529  /** check the current file size, create a new file with increase ending,
530  * if too big
531  */
532  if (_entryWriter->currentFileSize() > _maxFileSize)
533  {
536  }
537 
538  /** tell the writer which id to use and the corresponding base group */
539  _entryWriter->setEventID(evt.id());
540  _entryWriter->setBaseGroup(toString(evt.id()));
541 
542  /** write all entries to file using the writer
543  *
544  * @note we can't use for_each here, since we need to ensure that the
545  * entries are written sequentially and for_each can potentially use
546  * omp to parallelize the execution.
547  */
548  hdf5::WriteEntry &writeEntry(*_entryWriter);
549  list<entry_t>::const_iterator it(_procList.begin());
550  list<entry_t>::const_iterator last(_procList.end());
551  while(it != last)
552  writeEntry(*it++);
553 }
554 
556 {
557  /** check the current file size, create a new file with increase ending,
558  * if too big
559  */
560  if (_entryWriter->currentFileSize() > _maxFileSize)
561  {
564  }
565 
566  /** tell the writer which id to use and the corresponding base group */
567  _entryWriter->setEventID(evt.id());
568 
569  /** write all entries to file using the writer
570  *
571  * @note we can't use for_each here, since we need to ensure that the
572  * entries are written sequentially and for_each can potentially use
573  * omp to parallelize the execution.
574  */
575  list<entry_t>::const_iterator it(_procList.begin());
576  list<entry_t>::const_iterator last(_procList.end());
577  while(it != last)
578  (*_entryWriter)(*it++);
579 }
580 
581 
size_t dim() const
what is the dimension of the result
Definition: result.hpp:503
setArrayIndex(int i)
void writeAttib(const Processor::result_t &data, const string &dsetName)
write the attributes to the dataset
std::string name
name of the value in the file
Event to store all LCLS Data.
Definition: cass_event.h:32
virtual void loadSettings(size_t)
load the settings of this pp
toString(const QString &format)
std::list< entry_t > _procSummaryList
container for all pps that should be written when program quits
WriteEntry(const string &filename, const CASSEvent::id_t id=0)
constructor
const name_t name() const
retrieve the name of this processor
Definition: processor.h:167
string _baseGroupname
the base group name
std::tr1::function< void(const Processor::result_t &, const string &)> _writeAttributes
function to write the results axis attributes just once to the dataset
virtual void operator()(const pp1002::entry_t &entry)
write an entry to h5 file using the functions defined above
check if FEL is off by checking for bykick which is eventid
pp1002(const name_t &)
constructor
std::tr1::shared_ptr< hdf5::WriteEntry > entryWriter_t
define pointer to the entry writer
Settings for CASS.
Definition: cass_settings.h:30
AppendEntry(const string &filename, const CASSEvent::id_t id=0)
constructor
A handler for h5 files.
size_t _maxFileSize
the maximum file size of the single file
void writeEventToSingleFile(const CASSEvent &evt)
function to write the events to a single file
virtual void processEvent(const CASSEvent &)
process the event
uint64_t id_t
define the id type
Definition: cass_event.h:52
bool _hide
flag to tell whether this pp should be hidden in the dropdown list
Definition: processor.h:262
size_t currentFileSize() const
retrieve the current file size
STL namespace.
an axis of a more than 0 dimensional container
Definition: result.hpp:29
static std::string increaseDirCounter(const std::string &fname)
increase the alpha counter
void writeEventToMultipleEventsFile(const CASSEvent &evt)
function to write the events to a file that contains multiple events
things written only at end of run H5Dump ProcessorSummary size
std::tr1::function< void(const CASSEvent &)> _writeEvent
write event to file
result classes
virtual void setEventID(const CASSEvent::id_t id)
set the event id
::hdf5::Handler _fh
the file handle of the h5 file
std::tr1::function< void(void)> _writeSummary
write summary to file
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
fromStdString(const std::string &str)
void setBaseGroup(const string &name)
set the base group name
beginReadArray(const QString &prefix)
base class for processors.
Definition: processor.h:39
void writeSummaryToSingleFile()
function to write the summary to a single file
shared_pointer setupDependency(const std::string &depVarName, const name_t &name="")
setup the dependecy.
Definition: processor.cpp:114
std::vector< hsize_t > shape_t
define the shape type
Definition: hdf5_handle.hpp:43
const axis_t & axis() const
read access to the axis
Definition: result.hpp:449
int _maxFilePerSubDir
the number of files in each subdir
std::string groupname
group where the data will be written to
struct bundleing info for writing an entry to file
void appendEventToMultipleEventsFile(const CASSEvent &evt)
function to write the events to a file that contains multiple events
void writeSummaryToMultipleEventsFile()
write the summary to a file that contains multiple events
std::list< entry_t > _procList
container with all pps that contain the histograms to dump to hdf5
file contains declaration of classes and functions that help other processors to do their job...
static std::string intializeFile(const std::string &fname)
initialize the filename
uint32_t options
options for writing
id_t & id()
setters
Definition: cass_event.h:64
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
auxiliary data[Processor]
value(const QString &key, const QVariant &defaultValue=QVariant()
entryWriter_t _entryWriter
the entry writer
virtual void operator()(const pp1002::entry_t &entry)
write an entry to h5 file using the functions defined above
void setupGeneral()
general setup of the processor
Definition: processor.cpp:85
static std::string removeAlphaSubdir(const std::string &fname)
remove the alpha counter subdir from filename
void writeNothing(const Processor::result_t &, const string &)
write nothing
file contains specialized class that do the settings for cass
write an entity to a h5 file
shared_pointer proc
processor holding the data to be written
hid_t createGroupNameFromEventId(uint64_t eventid, hid_t calibcycle)
create group name for an event from its ID
declaration of pp1001 (hdf5_converter)
virtual const result_t & result(const CASSEvent::id_t eventid=0)
overwrite the retrieval of an histogram
shared_pointer _condition
pointer to the processor that will contain the condition
Definition: processor.h:277
static std::string increaseFileCounter(const std::string &fname)
increase the alpha counter in the file name
QMutex _lock
a lock to make the process reentrant
bool setupCondition(bool defaultConditionType=true)
setup the condition.
Definition: processor.cpp:94
std::string name_t
define the name type
Definition: processor.h:46
contains a logger for cass
virtual void aboutToQuit()
dump all pp histograms to summary group just before quitting
std::tr1::shared_ptr< Processor > shared_pointer
a shared pointer of this
Definition: processor.h:43
int _filecounter
counter to count how many files have been written
CASSEvent::id_t _id
the eventid to look for
easier api for hdf5 file writing
append an entity to a dataset in h5 file
static std::string intializeDir(const std::string &fname)
initialize the directory
beginGroup(const QString &prefix)
setTime_t(uint seconds)
virtual const result_t & result(const CASSEvent::id_t eventid=0)
retrieve a result for a given id.
Definition: processor.cpp:54
virtual void setEventID(CASSEvent::id_t id)
add the event id to the event id dataset
std::string _basefilename
the filename that the data will be written to