Using a full-featured RPC framework for IPC seems like overkill when the processes run on the same machine. However, if your project anyway exposes RPCs for public APIs or would benefit from a schema-based serialisation layer it makes sense to use only one tool that combines these—also for IPC.

Microservices for beginners

For the FlashCam software running on the DAQ servers we converged on the following high-level architecture:

Each bright box corresponds to a software process running on the DAQ server. The middle row are those processes that directly talk to the camera hardware via Ethernet using their hardware-specific protocols. Each of these control & monitoring processes expose a small set of functions that other processes may call to change or monitor the state. The two processes in the lower row are somewhat special: they do not connect to any hardware, but rather act as independent connectors between subsystems to run control loops. “Thermal” monitors the subsystem temperatures and adjusts cooling and ventilation parameters to stabilise the camera; “PDP-Readout-Connector” monitors pixel health, deactivates the trigger contribution of tripped or broken pixels and reactivates the trigger contribution of recovered pixels.This way the corresponding logic resides in a single, well-scoped subsystem and the Readout and PDP subsystems do not need to know anything about each other.

The “State Supervisor” orchestrates these subsystems and exposes an abstraction of the camera functionality via an RPC interface—essentially as a small set of states to hide the details of the hardware subsystems. The State Supervisor acts as the sole entry point for external control & monitoring; only a camera expert may control the subsystems individually in engineering mode.

All processes send their telemetry (log events, metrics, eventually trace events) to one single collector that dumps these to disk in a time-ordered fashion, e.g., one file per night for next-day analysis by camera experts. One aggregate telemetry stream per camera (as opposed to one telemetry stream per subsystem) is easier to manage and follow. The operators may subscribe to a subset of the telemetry, such as warnings and errors. With only a couple of hundred MByte of telemetry per night we do not expect this process to become a bottleneck.

We decided to maximise decoupling by splitting the subsystems into their own processes (as opposed to a monolith) for the following reasons:

  • subsystems may be developed independently by the corresponding experts

  • subsystems may be deployed individually, e.g., in hardware test setups or during software upgrades

  • subsystems are testable in isolation using exactly the same interface as in operation

Such a modular architecture has potential drawbacks that need to be taken into account:

  • to limit uncontrolled growth and foster subsystem homogeneity we provide a common framework for telemetry and inter-process communication and a set of tools for structuring metrics and APIs

  • to have one source of truth we keep the code of all subsystems in one monorepo, which also simplifies managing dependencies and building, testing and deploying the software

  • to ensure subsystem compatibility & consistency we require an IPC framework that checks procedure calls and arguments at compile time

Why gRPC?

To minimise dependencies we aimed at using the same library for inter-process communication as for the remote interfaces. Ideally, the RPC layer does not incur a significant latency overhead compared to traditional means of IPC and enforces compile-time consistency via schemas. Having a serialisation layer that is independent of the RPC layer and that supports schema evolution makes sure that the in-flight telemetry messages may also be stored to disk in the same format and that we can evolve subsystems while retaining backwards compatibility, e.g., for the monitoring data.

There are a few RPC frameworks that fit these needs and combine the benefits of schema-based serialisation with RPC functionality. For our projects (which are mainly based on C/C++), three popular, mature options seem to exist: Apache Thrift, Cap’n Proto, gRPC. For FlashCam we settled on gRPC because its (default) serialisation layer, protobuf, is already being used in CTA. gRPC has the added benefits of providing efficient streaming connections (for our telemetry data) and some very useful tooling, such as the excellent Buf linter and breaking-change detector.

Latency overhead of gRPC for local IPC

Our State Supervisor needs to handle state changes of ~10 subsystems in an ordered fashion; its code becomes much simpler when using a single thread and synchronous processing, but latencies of the individual calls add up and may become critical if they’re in the milliseconds-regime. Since I couldn’t find any measurements on gRPC’s unary call latency over Unix domain sockets, I chose to measure the latency distribution.

In our scenario of local IPC, some obvious tuning options exist:Compression seems to be disabled by default, otherwise this would’ve been an obvious knob to turn (off).

  • data is exchanged via a Unix domain socket (unix:// address) instead of a TCP socket

  • server and client may run on the same CPU core or separate cores; we will test both to see the difference

The test system is an AMD EPYC 7402P-based server running CentOS 8 with the “latency-performance” profile and gRPC v1.40.0; “other core” below means running the client on cores not sharing an L3 cache with the server. We compare the performance with the most simple traditional IPC method I could think of: exchanging C structs over a Unix domain socket using blocking I/O—which should have near-ideal performance for a sockets-based approach.Shared-memory IPC should be faster because it happens in userspace only. But it’s not straightforward (for me) to scale this to a server supporting multiple clients—I’d be interested in a neat example, though!

The histograms of the latency distributions of 1 million IPC calls per combination are plotted below:

The median and upper percentiles are tabulated below:

IPC technology Server/client thread distribution Unary call latency median 95th percentile 99th percentile
UDS same core 4 µs 5 µs 6 µs
other core 11 µs 12 µs 13 µs
gRPC same core 167 µs 178 µs 200 µs
other core 116 µs 129 µs 142 µs

When using Unix domain sockets and blocking I/O on the same core, the kernel seems to be able to immediately switch the context to the reading thread—hence, this represents the optimal but somewhat contrived case. It seems more realistic to compare the “other core” latencies, where gRPC is about a factor of 10 slower than blocking I/O over Unix domain sockets.

Conclusions

The ~100 µs unary-call latency overhead of gRPC for local IPC is entirely acceptable for our purposes; the overhead is outweighed by the benefits of using only a single library for IPC and RPC, and having well-defined, strongly-typed interfaces that are checked for consistency at compile-time and have clear evolution strategies.

For the connections to the telemetry collector, congestion from handling multiple clients may be a concern in situations such as a run start, where all subsystems are being configured and produce a burst of messages. Our prototype telemetry collector currently handles up to ~120k–150k telemetry messages per second for up to 3 concurrent clients and a maximum aggregate throughput of ~400k messages per second for our ~10 clients—also sufficient for our purposes.

Literature

Performance best practices with gRPC mentions that converting unary calls to bidirectional streaming could improve performance. I explicitly decided against that to keep code simple to grok.

API design is stuck in the past explains the advantages of controlled, schema-driven development for RPC interfaces.

Code appendix

The complete code for running the two microbenchmarks is shown below.

Makefile
CXX = g++
CPPFLAGS += `pkg-config --cflags protobuf grpc`
CXXFLAGS += -std=c++17 -O2 -march=native -mtune=native

SYSTEM ?= $(shell uname | cut -f 1 -d_)
LDFLAGS += `pkg-config --libs protobuf grpc++ grpc`
ifeq ($(SYSTEM),Darwin)
LDFLAGS += -lgrpc++_reflection
else
LDFLAGS += -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed
endif
LDFLAGS += -ldl

all: uds-ipc-latency grpc-ipc-latency

uds-ipc-latency: uds-ipc-latency.o
	$(CXX) $^ $(LDFLAGS) -o $@

grpc-ipc-latency: trivial.ipc.pb.o trivial.ipc.grpc.pb.o grpc-ipc-latency.o
	$(CXX) $^ $(LDFLAGS) -o $@

.PRECIOUS: %.grpc.pb.cc
%.grpc.pb.cc: %.proto
	protoc --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` $<

.PRECIOUS: %.pb.cc
%.pb.cc: %.proto
	protoc --cpp_out=. $<

clean:
	rm -f *.o *.pb.cc *.pb.h uds-ipc-latency grpc-ipc-latency

uds-ipc-latency.cpp
// uds-ipc-latency.cpp - Measure latency of unary IPC calls over a Unix domain socket using blocking I/O.
//
// Compile: g++ -std=c++17 -O2 -march=native -mtune=native -o uds-ipc-latency uds-ipc-latency.cpp
// Run on 1st and 2nd core: ./uds-ipc-latency 0x1 0x2 > uds-ipc-latencies-nsec.txt
#include <algorithm>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <vector>

#include <sys/socket.h>
#include <sys/errno.h>
#include <unistd.h>

#if __linux__
#include <sched.h>
#endif

using namespace std;

struct DummyMsg {
    int64_t i;
};

void die(const char *msg) {
    fprintf(stderr, "%s: %s (%d)\n", msg, strerror(errno), errno);
    exit(1);
}

void call(int socket, DummyMsg &msg) {
    if (int rc = write(socket, &msg, sizeof(msg)); rc != sizeof(msg))
        die("write() failed");

    if (int rc = read(socket, &msg, sizeof(msg)); rc != sizeof(msg))
        die("read() failed");
}

int main(int argc, const char *argv[])
{
#if __linux__
    cpu_set_t parent_mask, child_mask;
    CPU_ZERO(&parent_mask);
    CPU_ZERO(&child_mask);
    if (argc == 3) {
        sscanf(argv[1], "%x", &parent_mask);
        sscanf(argv[2], "%x", &child_mask);
    }
#endif

    int sockets[2];
    if (int rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets); rc == -1)
        die("socketpair() failed");

    pid_t pid = fork();
    if (pid == -1)
        die("fork() failed");

    if (pid == 0) {  // child
        close(sockets[0]); // close parent's socket
        #if __linux__
        if (int rc = sched_setaffinity(0, sizeof(child_mask), &child_mask); rc == -1)
            die("sched_setaffinity() failed");
        #endif

        DummyMsg msg{.i = 0};
        for (int i = 0; i < 1000; i++)
            call(sockets[1], msg);  // warmup

        std::vector<int> latencies;
        latencies.reserve(1000000);
        for (int i = 0; i < 1000000; i++) {
            auto start = chrono::high_resolution_clock::now();
            call(sockets[1], msg);
            auto end = chrono::high_resolution_clock::now();

            latencies.push_back(chrono::duration_cast<chrono::nanoseconds>(end - start).count());
        }

        for (auto l : latencies)
            printf("%i\n", l);

        close(sockets[1]);
    } else {  // parent
        close(sockets[1]);  // close child's socket

        #if __linux__
        if (int rc = sched_setaffinity(0, sizeof(parent_mask), &parent_mask); rc == -1)
            die("sched_setaffinity() failed");
        #endif

        DummyMsg msg;
        while (1) {
            if (int rc = read(sockets[0], &msg, sizeof(msg)); rc != sizeof(msg))
                die("read() failed");

            msg.i++;

            if (int rc = write(sockets[0], &msg, sizeof(msg)); rc != sizeof(msg))
                die("write() failed");
        }

        close(sockets[0]);
    }
}

grpc-ipc-latency.cpp
// grpc-ipc-latency.cpp - Measure latency of unary IPC calls over a Unix domain socket using gRPC.
//
// Compile: g++ -std=c++17 -O2 -march=native -mtune=native -o grpc-ipc-latency grpc-ipc-latency.cpp
// Run on 1st and 2nd core: ./grpc-ipc-latency 0x1 0x2 > grpc-ipc-latencies-nsec.txt
#include <algorithm>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <thread>
#include <vector>

#include <sys/socket.h>
#include <sys/errno.h>
#include <unistd.h>

#if __linux__
#include <sched.h>
#endif

#include <grpcpp/grpcpp.h>

#include "trivial.ipc.grpc.pb.h"

using namespace std;
using namespace trivial::ipc;
using namespace grpc;
using namespace std::chrono_literals;

void die(const char *msg) {
    fprintf(stderr, "%s: %s (%d)\n", msg, strerror(errno), errno);
    exit(1);
}

class RPCServiceImpl final : public RPCService::Service {
  Status UnaryCall(ServerContext* /*context*/, const UnaryCallRequest* request,
                   UnaryCallReply* reply) override {
    reply->set_i(request->i() + 1);
    return Status::OK;
  }
};

int main(int argc, const char *argv[])
{
#if __linux__
    cpu_set_t parent_mask, child_mask;
    CPU_ZERO(&parent_mask);
    CPU_ZERO(&child_mask);
    if (argc == 3) {
        sscanf(argv[1], "%x", &parent_mask);
        sscanf(argv[2], "%x", &child_mask);
    }
#endif

    std::string address{"unix:///tmp/test.socket"};

    pid_t pid = fork();
    if (pid == -1)
        die("fork() failed");

    if (pid == 0) {  // child
        #if __linux__
        if (int rc = sched_setaffinity(0, sizeof(child_mask), &child_mask); rc == -1)
            die("sched_setaffinity() failed");
        #endif

        std::this_thread::sleep_for(100ms);  // wait for setup

        auto client = RPCService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()));

        UnaryCallRequest req{};
        req.set_i(0);

        UnaryCallReply rep{};
        for (int i = 0; i < 1000; i++) {  // warmup
            ClientContext ctx{};
            if (grpc::Status status = client->UnaryCall(&ctx, req, &rep); !status.ok())
                die("UnaryCall failed");
        }

        std::vector<int> latencies;
        latencies.reserve(1000000);
        for (int i = 0; i < 1000000; i++) {
            auto start = chrono::high_resolution_clock::now();
            ClientContext ctx{};
            if (grpc::Status status = client->UnaryCall(&ctx, req, &rep); !status.ok())
                die("UnaryCall failed");
            auto end = chrono::high_resolution_clock::now();

            latencies.push_back(chrono::duration_cast<chrono::nanoseconds>(end - start).count());
        }

        for (auto l : latencies)
            printf("%i\n", l);
    } else {  // parent
        #if __linux__
        if (int rc = sched_setaffinity(0, sizeof(parent_mask), &parent_mask); rc == -1)
            die("sched_setaffinity() failed");
        #endif

        RPCServiceImpl service;
        ServerBuilder builder;
        builder.AddListeningPort(address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service);

        std::unique_ptr<Server> server(builder.BuildAndStart());
        server->Wait();
    }
}

trivial.ipc.proto
syntax = "proto3";

package trivial.ipc;

service RPCService {
  rpc UnaryCall (UnaryCallRequest) returns (UnaryCallReply) {}
}

message UnaryCallRequest {
  fixed64 i = 1;
}

message UnaryCallReply {
  fixed64 i = 1;
}