CFEL - ASG Software Suite  2.5.0
CASS
sharedmemory_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2009,2010,2013 Lutz Foucar
2 
3 /**
4  * @file sharedmemory_input.cpp file contains definition of class that interfaces
5  * the LCLS shared memory
6  *
7  * @author Lutz Foucar
8  */
9 
10 #include <QtCore/QMutexLocker>
11 
12 #include <iostream>
13 #include <iomanip>
14 #include "sharedmemory_input.h"
15 #include "format_converter.h"
16 #include "pdsdata/xtc/Dgram.hh"
17 #include "log.h"
18 
19 
20 using namespace cass;
21 using namespace std;
22 
23 void SharedMemoryInput::instance(const string &partitionTag,
24  int index,
25  cass::RingBuffer<CASSEvent> &ringbuffer,
26  Ratemeter &ratemeter,
27  Ratemeter &loadmeter,
28  QObject *parent)
29 {
30  if(_instance)
31  throw logic_error("SharedMemoryInput::instance(): The instance of the base class is already initialized");
32  _instance = shared_pointer(new SharedMemoryInput(partitionTag,
33  index,
34  ringbuffer,
35  ratemeter,
36  loadmeter,
37  parent));
38 }
39 
40 SharedMemoryInput::SharedMemoryInput(const string &partitionTag,
41  int index,
42  RingBuffer<CASSEvent>& ringbuffer,
43  Ratemeter &ratemeter,
44  Ratemeter &loadmeter,
45  QObject *parent)
46  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
47  _partitionTag(partitionTag),
48  _index(index),
49  _convert(*FormatConverter::instance()),
50  _eventscounter(0),
51  _skippedeventscounter(0)
52 {
53  load();
54 }
55 
57 {
59 }
60 
62 {
64  Log::add(Log::DEBUG0,"SharedMemoryInput::run(): starting shared memory in put with partition Tag: '" +
65  _partitionTag + "' and Client Index " + toString(_index));
66  Pds::XtcMonitorClient::run(_partitionTag.c_str(),_index,_index);
67  Log::add(Log::DEBUG0,"SharedMemoryInput::run(): shared memory input is closing down");
68 }
69 
71 {
72  Log::add(Log::DEBUG0,"SharedMemoryInput::end(): got signal to close");
73  _control = _quit;
74  Log::add(Log::DEBUG0,"SharedMemoryInput::end(): wait for 5 s that shared memory shuts down");
75  if(!wait(5000))
76  {
77  Log::add(Log::DEBUG0,string("SharedMemoryInput::end(): time has elapsed. So we") +
78  " probably lost connection to the shared memory. Therefore we will" +
79  " terminate the thread");
80  terminate();
81  }
82  else
83  {
84  Log::add(Log::DEBUG0,"SharedMemoryInput::end(): Ok. Shared Memory input thread has shut down within 5 s");
85  }
86 }
87 
88 int SharedMemoryInput::processDgram(Pds::Dgram* datagram)
89 {
90  pausePoint();
91 
92  /** check if it just timed out, if so return */
93  if(!datagram)
94  return (shouldQuit());
95 
96  /** retrieve a new element from the ringbuffer */
97  rbItem_t rbItem(getNextFillable());
98  if (rbItem == _ringbuffer.end())
99  return (shouldQuit());
100 
101  /** read the datagram to the ringbuffer element */
102  CASSEvent::buffer_t& buf(rbItem->element->datagrambuffer());
103  buf.assign(reinterpret_cast<CASSEvent::buffer_t::value_type*>(datagram),
104  reinterpret_cast<CASSEvent::buffer_t::value_type*>(datagram)+(sizeof(Pds::Dgram)+datagram->xtc.sizeofPayload()));
105 
106  /** now convert the datagram to a cassevent */
107  const bool isGood = _convert(rbItem->element.get());
108 
109  /** advance the counters */
110  isGood ? ++_eventscounter : ++_skippedeventscounter;
111 
112  /** tell the buffer that we are done, but also let it know whether it is a good event */
113  _ringbuffer.doneFilling(rbItem,isGood);
114 
115  /** for ratemeter purposes send a signal that we added a new event */
116  newEventAdded(rbItem->element->datagrambuffer().size());
117 
118  /** return the quit code */
119  return shouldQuit();
120 }
121 
123 {
124  return _eventscounter;
125 }
126 
128 {
129  return _skippedeventscounter;
130 }
FormatConverter & _convert
a reference to the format converter functor
Shared Memory Input for receiving xtc datagrams.
uint64_t _skippedeventscounter
a counter for the skipped events
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
int _index
the client index of the shared memory
void load()
load the parameters of the FormatConverter
status_t _status
the internal status of the thread
file contains declaration of class that interfaces the LCLS shared memory
file contains declaration of the container for all format converters
int processDgram(Pds::Dgram *dg)
overwrite the base class function with our code
uint64_t skippedeventcounter()
retrieve the number of skipped processed events
Input base class.
Definition: input_base.h:31
STL namespace.
Format converter container.
std::vector< char > buffer_t
a buffer type
Definition: cass_event.h:49
SharedMemoryInput(const std::string &PartitionTag, int index, RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, QObject *parent=0)
constructor
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
uint64_t eventcounter()
retrieve the number of processed events
bool shouldQuit() const
query whether this thread is told to quit
control_t _control
the internal control status of the thread
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
void runthis()
starts the thread
static shared_pointer instance()
get the signelton instance
Definition: input_base.cpp:20
RingBuffer< CASSEvent > & _ringbuffer
reference to the ringbuffer
Definition: input_base.h:140
uint64_t _eventscounter
a counter for the events
wait(unsigned long time=ULONG_MAX)
std::string _partitionTag
the name of the partition tag we connect to
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
void pausePoint()
point where the thread will be paused
rbItem_t getNextFillable(unsigned timeout=500)
retrieve an iterator to the next fillable event
Definition: input_base.cpp:48
contains a logger for cass
void loadSettings(size_t what)
function to load the settings for the format converter
void newEventAdded(const size_t eventsize)
increment the numer of events received in the ratemeter
Definition: input_base.cpp:37
void end()
do all clean up when quitting