16 #include <QtCore/QFileInfo>
18 #include <DataAccessUserAPI.h>
54 vector<int>::const_iterator listend,
55 int runNbr,
int blNbr,
int highTagNbr)
56 : _liststart(liststart),
61 _highTagNbr(highTagNbr),
63 _skippedeventscounter(0)
72 convert.
cacheParameters(_liststart,_listend,_blNbr,_runNbr,_highTagNbr);
75 string output(
"TagListProcessor: The following tags will be processed by '" +
77 "' (size is '" +
toString(distance(_liststart,_listend)) +
79 for (; _iter != _listend; ++_iter)
80 output +=
" '" +
toString(*_iter) +
"',";
81 Log::add(Log::VERBOSEINFO,output);
86 InputBase::shared_pointer::element_type&
input(InputBase::reference());
92 for(;(!input.shouldQuit()) && (_iter != _listend); ++_iter)
98 if (rbItem == input.ringbuffer().end())
102 uint64_t datasize = convert(_highTagNbr,*_iter,*rbItem->element);
109 Log::add(Log::WARNING,
"TagListProcessor: Event with id '"+
110 toString(rbItem->element->id()) +
"' is bad: skipping Event");
111 ++_skippedeventscounter;
119 input.newEventAdded(datasize);
120 input.ringbuffer().doneFilling(rbItem, datasize);
130 const double currsize(distance(_liststart,_iter));
131 const double fullsize(distance(_liststart,_listend));
132 return currsize/fullsize;
187 int funcstatus,startTagNbr,endTagNbr = 0;
188 funcstatus = sy_read_start_tagnumber(&highTagNbr,&startTagNbr,blNbr,runNbr);
191 Log::add(Log::ERROR,
"getCompleteTagList: could not retrieve start tag of run '" +
193 "' Errorcode is '" +
toString(funcstatus) +
"'");
196 funcstatus = sy_read_end_tagnumber(&highTagNbr,&endTagNbr,blNbr,runNbr);
199 Log::add(Log::ERROR,
"getCompleteTagList: could not retrieve end tag of run '" +
201 "' Errorcode is '" +
toString(funcstatus) +
"'");
211 Log::add(Log::VERBOSEINFO,
"getCompleteTagList: get Taglist for tags between '" +
213 "' with highTag '" +
toString(highTagNbr)+
"' for run '" +
215 struct da_int_array *tagListBuffer=NULL;
216 funcstatus = da_alloc_int_array(&tagListBuffer,0,NULL);
219 Log::add(Log::ERROR,
string(
"getCompleteTagList: could not allocate the ") +
220 "sacla int array struct. Errorcode is '" +
222 da_destroy_int_array(&tagListBuffer);
225 funcstatus = sy_read_taglist(tagListBuffer,blNbr,highTagNbr,startTagNbr,endTagNbr);
228 Log::add(Log::ERROR,
"getCompleteTagList: could not retrieve taglist of run '" +
230 "'. Errorcode is '" +
toString(funcstatus) +
"'");
231 da_destroy_int_array(&tagListBuffer);
235 int tagListSize = -1;
236 funcstatus = da_getsize_int_array(&tagListSize, tagListBuffer);
239 Log::add(Log::ERROR,
string(
"getCompleteTagList: could not get the size of ") +
240 "sacla int array struct. Errorcode is '" +
242 da_destroy_int_array(&tagListBuffer);
247 for (
size_t i(0); i < static_cast<size_t>(tagListSize); ++i)
250 da_getint_int_array(&buffer,tagListBuffer,i);
251 taglist.push_back(buffer);
254 funcstatus = da_destroy_int_array(&tagListBuffer);
257 Log::add(Log::ERROR,
string(
"getCompleteTagList: error destroying the ") +
258 "sacla int array struct. Errorcode is '" +
toString(funcstatus) +
267 void SACLAOfflineInput::instance(
string runlistname,
274 throw logic_error(
"SACLAOfflineInput::instance(): The instance of the base class is already initialized");
278 SACLAOfflineInput::SACLAOfflineInput(
string runlistname,
283 :
InputBase(ringbuffer,ratemeter,loadmeter,parent),
284 _quitWhenDone(quitWhenDone),
285 _runlistname(runlistname)
307 if (!runlistfile.is_open())
308 throw invalid_argument(
"SACLAOfflineInput::run(): filelist '" +
_runlistname +
309 "' could not be opened");
310 vector<string> runlist(tokenize(runlistfile));
317 vector<string>::const_iterator runlistIt(runlist.begin());
318 vector<string>::const_iterator runlistEnd(runlist.end());
319 while ((!
shouldQuit()) && (runlistIt != runlistEnd))
322 string runname(*runlistIt++);
323 stringstream ss(runname);
326 while(getline(ss,str,
','))
328 stringstream ssvalue(str);
331 nbrs.push_back(value);
336 runname +
"' into a beamline number and runname");
346 funcstatus = sy_read_runstatus(&runstatus,blNbr,runNbr);
350 "run status of run '" +
toString(runNbr) +
"' at beamline '" +
358 "' at beamline '" +
toString(blNbr) +
"' has not finised yet");
365 vector<int> taglist(nbrs.begin()+2,nbrs.end());
379 vector<int> completeTagList;
382 vector<int>::const_iterator tagIter(taglist.begin());
383 vector<int>::const_iterator taglistEnd(taglist.end());
384 bool didnotfind(
false);
385 for (;tagIter != taglistEnd; ++tagIter)
387 if (find(completeTagList.begin(),completeTagList.end(),*tagIter) == completeTagList.end())
397 "' at beamline '" +
toString(blNbr) +
"'");
405 int chunksize = taglist.size() /
_chunks;
406 for (
int chunk(0); chunk <
_chunks-1; ++chunk)
408 vector<int>::const_iterator chunkstart(taglist.begin() + (chunk*chunksize));
409 vector<int>::const_iterator chunkend(taglist.begin() + (chunk+1)*chunksize);
412 processor(
new TagListProcessor(chunkstart,chunkend,runNbr,blNbr,highTagNbr));
416 _procs.push_back(processor);
419 vector<int>::const_iterator chunkstart(taglist.begin() + ((_chunks-1)*chunksize));
421 processor(
new TagListProcessor(chunkstart,taglist.end(),runNbr,blNbr,highTagNbr));
425 _procs.push_back(processor);
428 proc_t::iterator processorsIt(
_procs.begin());
429 proc_t::iterator processorsEnd(
_procs.end());
430 for (; processorsIt != processorsEnd; ++processorsIt)
432 (*processorsIt)->wait();
433 (*processorsIt)->rethrowException();
434 eventcounter += (*processorsIt)->nEventsProcessed();
452 double progressSum(0.);
453 for (proc_t::const_iterator it(
_procs.begin()); it !=
_procs.end(); ++it)
454 progressSum += (*it)->progress();
455 return (progressSum /
_procs.size());
461 for (proc_t::const_iterator it(
_procs.begin()); it !=
_procs.end(); ++it)
462 counter += (*it)->nEventsProcessed();
469 for (proc_t::const_iterator it(
_procs.begin()); it !=
_procs.end(); ++it)
470 counter += (*it)->nEventsSkipped();
uint64_t nEventsProcessed()
retrieve the number of events processed by this thread
class calculating a rate in Hz.
sleep(unsigned long secs)
status_t _status
the internal status of the thread
void runthis()
process the tags on the list
Class for reading SACLA data.
double progress()
retrieve the progess within the list
void cacheParameters(std::vector< int >::const_iterator first, std::vector< int >::const_iterator last, int blNbr, int runNbr, int highTagNbr)
retrieve requested beamline parameters in one go
A processor for a tag list.
void loadSettings()
load the settings of the reader
vector< int >::const_iterator _listend
iterator to the end of the list
static void add(Level level, const std::string &line)
add a string to the log
A Ringbuffer, handles communication between Input and Worker Threads.
std::tr1::shared_ptr< TagListProcessor > shared_pointer
typedef the shared pointer of this
bool getCompleteTagList(vector< int > &taglist, int &highTagNbr, int blNbr, int runNbr)
retrieve the list of tags and the associated high tag number
vector< int >::const_iterator _liststart
iterator to the start of the list
TagListProcessor(vector< int >::const_iterator liststart, vector< int >::const_iterator listend, int runNbr, int blNbr, int highTagNbr)
constructor
int _blNbr
the beamline number for the experiment
bool shouldQuit() const
query whether this thread is told to quit
uint64_t nEventsSkipped()
retrieve the number of events skipped by this thread
tokenize to return all lines of an ascii file in a vector
vector< int >::const_iterator _iter
iterator to the current item being processed
std::string toString(const Type &t)
convert any type to a string
value(const QString &key, const QVariant &defaultValue=QVariant()
int _highTagNbr
the first part of the tag (that doesn't change)
int _runNbr
the run number for the experiment
file contains specialized class that do the settings for cass
Example of how to use the sacla online input
uint64_t _skippedeventscounter
a counter to count how many events (tags) have been skipped
contains a logger for cass
contains class to convert sacla data to cassevent
check if there is some light in the chamber based upon the GMD value
beginGroup(const QString &prefix)
uint64_t _counter
a counter to count how many events (tags) have been processed
A QThread that has the ability to be paused and resumed.