CFEL - ASG Software Suite
2.5.0
CASS
|
TCP Input for receiving data. More...
#include <tcp_input.h>
Public Member Functions | |
void | runthis () |
starts the thread More... | |
void | load () |
do not load anything More... | |
![]() | |
virtual | ~InputBase () |
destructor More... | |
virtual double | progress () |
retrieve the fraction of how much of the input has been processed More... | |
virtual uint64_t | eventcounter () |
retrieve the number of events that have been input so far More... | |
virtual uint64_t | skippedeventcounter () |
retrieve the number of skipped events that have been input so far More... | |
void | newEventAdded (const size_t eventsize) |
increment the numer of events received in the ratemeter More... | |
RingBuffer< CASSEvent > & | ringbuffer () |
retrieve a reference to the the ringbuffer More... | |
rbItem_t | getNextFillable (unsigned timeout=500) |
retrieve an iterator to the next fillable event More... | |
![]() | |
PausableThread (control_t control=_run, QObject *parent=0) | |
constructor More... | |
virtual | ~PausableThread () |
destructor More... | |
void | run () |
run the thread More... | |
void | pause (bool block=false) |
pause the thread More... | |
void | waitUntilPaused () |
waits until thread is paused More... | |
void | resume () |
resume the thread More... | |
status_t | status () const |
return the current status of the thread More... | |
bool | shouldQuit () const |
query whether this thread is told to quit More... | |
void | rethrowException () const |
rethrow the thrown exception More... | |
exception_t | exceptionThrown () const |
![]() | |
QThread (QObject *parent=0) | |
currentThread () | |
HANDLE QThread::currentThreadId() | |
exec () | |
exit (int returnCode=0) | |
finished () | |
idealThreadCount () | |
isFinished () | |
isRunning () | |
msleep (unsigned long msecs) | |
priority () | |
quit () | |
run () | |
setPriority (Priority priority) | |
setStackSize (uint stackSize) | |
setTerminationEnabled (bool enabled=true) | |
sleep (unsigned long secs) | |
stackSize () | |
start (Priority priority=InheritPriority) | |
started () | |
terminate () | |
terminated () | |
usleep (unsigned long usecs) | |
wait (unsigned long time=ULONG_MAX) | |
yieldCurrentThread () | |
Static Public Member Functions | |
static void | instance (RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, QObject *parent=0) |
create an instance of this More... | |
![]() | |
static shared_pointer | instance () |
get the signelton instance More... | |
static shared_pointer::element_type & | reference () |
get reference to the singelton instance More... | |
Private Member Functions | |
TCPInput (RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, QObject *parent=0) | |
constructor More... | |
bool | connectToServer (QTcpSocket &socket) |
connect the socket to server More... | |
bool | dataAvailable (QTcpSocket &socket, qint64 datasize) |
wait until the data is available More... | |
Private Attributes | |
int | _timeout |
the timeout of the socket More... | |
Additional Inherited Members | |
![]() | |
typedef std::tr1::shared_ptr< InputBase > | shared_pointer |
shared pointer of this type More... | |
typedef RingBuffer< CASSEvent >::iter_type | rbItem_t |
define an item in the ringbuffer More... | |
![]() | |
enum | status_t { running, paused, notstarted } |
enum describing the internal status of the thread More... | |
enum | control_t { _run, _quit, _pause } |
enum describing the control status of the thread More... | |
enum | exception_t { NO_EXCEPTION, INVALID_ARGUMENT_EXCEPTION, RUNTIME_ERROR_EXCEPTION, OUT_OF_RANGE_EXCEPTION, LOGIC_ERROR_EXCEPTION, STANDART_EXCEPTION, UNKNOWN_EXCEPTION } |
enum describing which exception was thrown More... | |
![]() | |
virtual void | end () |
tell the thread to quit More... | |
![]() | |
QMutex | lock |
a mutex so that external program can lock access to this More... | |
![]() | |
InputBase (RingBuffer< CASSEvent > &ringbuffer, Ratemeter &ratemeter, Ratemeter &loadmeter, QObject *parent=0) | |
protected constructor since it should be a singelton More... | |
![]() | |
void | pausePoint () |
point where the thread will be paused More... | |
![]() | |
RingBuffer< CASSEvent > & | _ringbuffer |
reference to the ringbuffer More... | |
Ratemeter & | _ratemeter |
ratemeter to measure the rate More... | |
Ratemeter & | _loadmeter |
meter to measure the data load More... | |
![]() | |
QMutex | _pauseMutex |
mutex to wait on until thread is paused More... | |
QWaitCondition | _pauseCondition |
wait condition to wait on until thread is resumed More... | |
QWaitCondition | _waitUntilPausedCondition |
wait condition to wait unitl thread is paused More... | |
status_t | _status |
the internal status of the thread More... | |
control_t | _control |
the internal control status of the thread More... | |
size_t | _pausecount |
a counter how many threads have pause this thread More... | |
exception_t | _exception_thrown |
flag to show that general exception was thrown More... | |
std::invalid_argument | _invarg_excep |
the invalid arguemnt exception thrown More... | |
std::runtime_error | _runt_excep |
the invalid arguemnt exception thrown More... | |
std::out_of_range | _outrange_excep |
the invalid arguemnt exception thrown More... | |
std::logic_error | _logic_excep |
the invalid arguemnt exception thrown More... | |
![]() | |
static shared_pointer | _instance |
singelton instance More... | |
TCP Input for receiving data.
This class is a thread that to a TCP Server and retrieves the data from it. it expects that before the payload conatining the data arrives the size of the payload is transmitted.
TCPInput/{Server}
The name or ip address of the machine that the server is running on. Default is "localhost"
TCPInput/{Port}
The port that the TCP Server is listening for connections on. Default is "9090"
TCPInput/{DataType}
The type of data that is streamed from the tcp server. Default is "agat". Possible values are:
TCPInput/{SocketDataTimeout_ms}
Time in ms to wait until the data should be available. If time was exeeded it will check if the connection on the socket to the server was lost. Default is 2000 ms.
TCPInput/{SocketConnectionTimout_ms}
Time in ms to wait until the socket is connected to the server. Default is 1000 ms.
TCPInput/{WaitUntilReconnectionTry_s}
Time in s to wait until another attempt is made to reconnect the socket to the server. Default is 5.
Definition at line 55 of file tcp_input.h.
|
private |
constructor
creates the thread. Connects to the tcp server and then retrieves the data streams. The data within the stream will be deserialized with the help of deserialization functions, where the user has to choose which one is appropriate via the .ini file parameters. The thread runs as long as noone calls the end() member of the base class. In case a timeout occurs when waiting for a new event, it will just continue and wait for the next timeout. In case that a timeout occurred when waiting for the data of an event it throws an runtime error.
buffer | the ringbuffer, that we take events out and fill it with the incomming information |
ratemeter | reference to the ratemeter to measure the rate of the input |
loadmeter | reference to the ratemeter to measure the load of the input |
parent | the parent of this object |
Definition at line 35 of file tcp_input.cpp.
|
private |
connect the socket to server
tries to connect the socket to the server as long as user did not finish the program or the socket has connected to the server
socket | the socket that one should connect to the server |
try to connect until its either connected or the thread is told to quit
Definition at line 42 of file tcp_input.cpp.
References cass::Log::add(), QSettings::beginGroup(), QSettings::endGroup(), cass::Log::INFO, lmf::PausableThread::shouldQuit(), QThread::sleep(), cass::toString(), QSettings::value(), and cass::Log::WARNING.
Referenced by dataAvailable(), and runthis().
|
private |
wait until the data is available
waits until the requested datasize is available on the socket. In case there is a timeout, check if the connection was lost. If this is the case, reconnect. And return false.
socket | the socket for which to wait for data for |
datasize | the amount of data to wait for. |
first check if we're connected to the server. If not, then try to connect
check if the data is available, if not then wait for a while, if it isn't available then, check the connection to the server
Definition at line 75 of file tcp_input.cpp.
References _timeout, connectToServer(), and lmf::PausableThread::shouldQuit().
Referenced by runthis().
|
static |
create an instance of this
this initializes the _instance member of the base class. Check here if it is already initialized, if so throw logic error.
buffer | the ringbuffer, that we take events out and fill it with the incomming information |
ratemeter | reference to the ratemeter to measure the rate of the input |
loadmeter | reference to the ratemeter to measure the load of the input |
parent | the parent of this object |
Definition at line 25 of file tcp_input.cpp.
|
inlinevirtual |
|
virtual |
starts the thread
Starts the thread and the loop that waits for data. When an timout occured it will just restart the loop until the quit flag is set.
connect to the server before starting the loop.
check whether the upper bit is set (indicating that data is compressed)
reset the upper bit
write received data into a temporary buffer
use stream to deserialize buffer
deserialize the buffer header
Implements cass::InputBase.
Definition at line 98 of file tcp_input.cpp.
References cass::InputBase::_ringbuffer, lmf::PausableThread::_status, _timeout, cass::Log::add(), QDataStream::atEnd(), QSettings::beginGroup(), connectToServer(), QByteArray::data(), dataAvailable(), QSettings::endGroup(), cass::Log::ERROR, cass::InputBase::getNextFillable(), cass::Log::INFO, cass::TCPStreamer::instance(), cass::InputBase::newEventAdded(), lmf::PausableThread::pausePoint(), QDataStream::readRawData(), QByteArray::resize(), lmf::PausableThread::running, QDataStream::setByteOrder(), QDataStream::setVersion(), lmf::PausableThread::shouldQuit(), and QSettings::value().
|
private |
the timeout of the socket
Definition at line 130 of file tcp_input.h.
Referenced by dataAvailable(), and runthis().