Cannot send/recv on labeled IDDP sockets

Sam Daniel sam at FirstMode.com
Mon Dec 28 22:40:53 CET 2020


I am writing a pub/sub tool based on IDDP. It is intended to be used
as an IPC tool, but right now I am doing all of my testing
within one process (sending/recving in the same thread).

What I have below is not working as expected…

When a channel is published for the first time, an IDDP socket
is created with the channel name as its label:

int create_socket_for_channel(const string& channel) {
    int socket  = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_IDDP);
    if (socket < 0) {
        close(socket);
                return -1;
    }
    rtipc_port_label plabel;
    strncpy(plabel.label, channel.c_str(), channel.length() + 1);
    int ret = setsockopt(socket, SOL_IDDP, IDDP_LABEL,
                                                 &plabel, sizeof(plabel));
    if (ret < 0) {
        close(socket);
                return -1;
    }
    size_t poolsz = PER_CHANNEL_POOLSZ;
    ret = setsockopt(socket, SOL_IDDP, IDDP_POOLSZ,
                                     &poolsz, sizeof(poolsz));
    if (ret < 0) {
        close(socket);
                return -1;
    }

    return socket;
}

The pub socket is bound and stored in a map for caching:

int pub_socket_find_or_create(const string& channel) {
    // if we've already created and bound this socket
    // find it and return it
    auto it = pub_sockets.find(channel);
    if (it != pub_sockets.end()) {
        return it->second; /* return socket */
    }
    int socket = create_socket_for_channel(channel);
    if (socket < 0) {
        return -1;
    }
    sockaddr_ipc saddr;
    memset(&saddr, 0, sizeof(sockaddr_ipc));
    saddr.sipc_family = AF_RTIPC;
    saddr.sipc_port = -1;
    int ret = bind(socket, (sockaddr*)&saddr, sizeof(sockaddr_ipc));
    if (ret) {
        close(socket);
        return -1;
    }
    pub_sockets.emplace(channel, socket);
    return socket;
}

This function actually publishes the message:

int sendmsg(my_msg_t msg) {
    string channel = msg.channel;
    if (channel.size() > CHANNEL_MAXLEN) {
                return -1;
    }
    if (msg.len > MTU) {
                return -1;
    }

    auto socket = pub_socket_find_or_create(channel);
    if (-1 == socket) {
                return -1;
    }

    sockaddr_ipc saddr;
    socklen_t saddr_len = sizeof(saddr);
    getpeername(socket, (sockaddr*)&saddr, &saddr_len);

    ret = send(socket, msg.buf, msg.len, MSG_DONTWAIT);
    fprintf(stdout,
                    "send(channel=%s, socket=%i, port=%i, len=%lu) -> %d - %s\n",
                    channel.c_str(), socket, saddr.sipc_port,
                    msg.len, ret, strerror(errno));
    return ret;
}

send appears to succeed - it returns a value equal to msg.len.
(sometimes strerror(errno) does return a string that does not
look like a success…)

Surprisingly, when I use -1 as the sipc_port in saddr,
send returns -1. Why is that? I would expect send to find the
right port on its own.

This function creates a socket for the subscription side
of the channel when a subscription is created:

int sub_socket_find_or_create(const string& channel) {
    // if we've already created and connected this socket,
    //find it and return it
    auto it = sub_sockets.find(channel);
    if (it != sub_sockets.end()) {
        return it->second.first; /* return socket */
    }
    int socket = create_socket_for_channel(channel);
    if (socket < 0) {
        return -1;
    }
    // set a timeout on connect(), otherwise connect()
    // will wait forever for the other end to call bind()
    // TODO: make this way less than 1
    struct timeval tv;
    tv.tv_sec = 1;  /* 1s timeout */
    int ret = setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO,
                                        (struct timeval *)&tv,sizeof(struct timeval));
    if (ret < 0) {
        close(socket);
        return -1;
    }
    sockaddr_ipc saddr;
    saddr.sipc_family = AF_RTIPC;
    saddr.sipc_port = -1;
    int ret = connect(socket, (sockaddr*)&saddr, sizeof(saddr));
    if (ret) {
        close(socket);
        return -1;
    }
    sub_sockets.emplace(channel, socket);
    return socket;
}

This function receives a message for a subscription:

int recvmsg(my_msg_t* msg) {
    for (auto it = sub_sockets.begin(); it != sub_sockets.end(); ++it) {
        auto channel = it->first;
        /* socket created by sub_socket_find_or_create */
        auto socket = it->second;
        uint8_t payload[MTU];

        // get label for debugging
        rtipc_port_label plabel;
        socklen_t plabel_len = sizeof(plabel);
        int ret = getsockopt(socket, SOL_IDDP, IDDP_LABEL,
                                                     &plabel, &plabel_len);
        if (-1 == ret) {
                    continue;
        }

        sockaddr_ipc saddr;
        socklen_t saddr_len = sizeof(saddr);
        ret = getpeername(socket, (sockaddr*)&saddr, &saddr_len);
        if (-1 == ret) {
                    continue;
        }

        ret = recv(socket, payload, MTU, MSG_DONTWAIT);
        fprintf(stdout,
                    "recv(channel=%s, socket=%i, port=%i) -> %d - %s\n",
                      plabel.label, socket,
                      saddr.sipc_port, ret, strerror(errno));
        if (ret <= 0) {
                    continue;
        }
        msg.utime = rt_now_usec();
        msg.channel = channel.c_str();
        msg.len = ret;
        msg.buf = payload;
        break;
    }
    if (msg.len <= 0) {
        return -1;
    }
    return msg.len;
}

The recv call seems to fail. It returns -1 and strerror prints
"Resource temporarily unavailable";

Here’s an example of my output:
send(channel=EXAMPLE0, socket=3, port=0, len=134)
        -> 134 - Success
send(channel=EXAMPLE1, socket=4, port=1, len=134)
        -> 134 - Success
send(channel=EXAMPLE2, socket=5, port=2, len=134)
        -> 134 - Success
recv(channel=EXAMPLE2, socket=9, port=2)
        -> -1 - Resource temporarily unavailable
recv(channel=EXAMPLE0, socket=7, port=0)
        -> -1 - Resource temporarily unavailable
recv(channel=EXAMPLE1, socket=8, port=1)
        -> -1 - Resource temporarily unavailable
send(channel=EXAMPLE0, socket=3, port=0, len=134)
        -> 134 - Resource temporarily unavailable
send(channel=EXAMPLE1, socket=4, port=1, len=134)
        -> 134 - Resource temporarily unavailable
send(channel=EXAMPLE2, socket=5, port=2, len=134)
        -> 134 - Resource temporarily unavailable
recv(channel=EXAMPLE2, socket=9, port=2)
        -> -1 - Resource temporarily unavailable
recv(channel=EXAMPLE0, socket=7, port=0)
        -> -1 - Resource temporarily unavailable
recv(channel=EXAMPLE1, socket=8, port=1)
       -> -1 - Resource temporarily unavailable

Is there something glaringly obvious that I'm doing wrong?
Am I getting the semantics of IDDP wrong?

FWIW I plan on reproducing this pub/sub implementation
for RT/NRT XDDP as well.

Thanks for the help!

________
SAM DANIEL
Software Engineer
+1.570.317.5253 | sam at firstmode.com<mailto:sam at firstmode.com>

FIRST MODE | Building the Barely Possible
2220 Western Avenue, Seattle, WA<webextlink://2220%20Western%20Avenue,%20Seattle,%20WA>
www.firstmode.com<http://www.firstmode.com>

________________________________
CONFIDENTIALITY CAUTION: This email (including any attachments) is intended only for the use of the individual or entity to whom it is addressed and may contain confidential information or trade secrets and be protected by legal privilege. Please notify us immediately by return email if you have received this email in error and delete this email and any attachments. If you are not the intended recipient, dissemination, disclosure and use of any information herein is prohibited.


More information about the Xenomai mailing list