I have been exploring flexibility and performance as part of the rjsoncons package for flexible JSON parsing. The package provides an R interface to the jsoncons library.

I want to support input from many different source – files, compressed files, URLs, etc. R provides support for these via it’s ‘connections’ interface, ?connection. On the other hand, jsoncons like many C++ programs interfaces with iostreams.

The C interface to connections is not part of R’s public API. My approach (1) takes inspiration from the readr package to invoke the R-level readBin() from C++ using the cpp11 R package; and (2) implements a streambuf subclass as outlined in a very helpful StackOverflow comment. These actually come together quite nicely.

R script

At the R level, we’ll open a connection for binary input, then invoke a C++ function that operates on the connection (the 2^22 will be used to read binary data from the connection in chunks of about 4 Mb), and finally close the connection.

con <- gzfile("some_file.json.gz", "rb")
result <- cpp_fun(con, 2^22)
close(con)

C++ class

For the C++ code, start by including relevant header files

#include <streambuf>
#include <cpp11.hpp>

Create a readbinbuf subclass of std::streambuf

class readbinbuf : public std::streambuf {

Create a static inline C++ variable that represents the call to R’s base::readBin(). We’ll pass an S-expression representing the connection constructed in R to our function, and store a reference to this connection in the class. We’ll also create a buffer of characters to store the result of readBin(), and a variable to store the number of bytes read.

    inline static auto read_bin = cpp11::package("base")["readBin"];
    const cpp11::sexp& con_;
    char *buf_;
    int n_bytes_;

That’s the end of the private details of the class. For the public interface, create a constructor that initializes the connection SEXP and number of bytes to read on each call to readBin(); allocate the buffer to store results. The destructor frees the buffer.

  public:
    readbinbuf(const cpp11::sexp& con, const int n_bytes)
      : con_(con), n_bytes_(n_bytes)
        {
            buf_ = new char[n_bytes_];
        }

    ~readbinbuf() { delete buf_; }

Implementing a subclass of std::streambuf for an input stream requires just a single function: underflow(), called whenever the buffer is ‘empty’ and returning an integer that indicates either the end-of-file or the value of the first character in the buffer. The skeleton of this function is

    int underflow() {
        if (gptr() == egptr()) {
            // Populate buf_ with the results of a call to read_bin()
            // Tell the base class about the buffer
            setg(buf_, buf_, buf_ + chunk_len);
        }
        return gptr() == egptr() ?
            std::char_traits<char>::eof() :
            std::char_traits<char>::to_int_type(*gptr());
    }

The basic idea is to register (using setg()) with the base class pointers to the start eback(), current gptr(), and end (egptr() one-after-last) of the buffer maintained by readbinbuf. The buffer is empty (needs refilling) when the current pointer is equal to the end pointer, gptr() == egptr().

Our implementation task is to invoke readBin() to get another chunk of data for the buffer. This is implemented by the additional lines of code

    int underflow() {
        if (gptr() == egptr()) {
            // invoke R's readBin() function, asking for up to n_bytes_
            SEXP chunk = read_bin(con_, "raw", n_bytes_);
            // copy the result from the SEXP into buf_, so we do not have
            // to worry about R's garbage collection
            R_xlen_t chunk_len = Rf_xlength(chunk);
            std::copy(RAW(chunk), RAW(chunk) + chunk_len, buf_);
            // update the streambuf pointers to our buffer
            setg(buf_, buf_, buf_ + chunk_len);
        }
    }

setg() sets the start and current pointers to the start of our buffer, and the end-of-buffer pointer to one past the last byte we’ve read. If read_bin() returns no values, then chunk_len is 0 and setg() sets the current location gptr() to the end location egptr().

The return value of underflow() is either end-of-file (if even after update the current and end pointers are the same) or the value pointed to by the current pointer. The complete function is

    int underflow() {
        if (gptr() == egptr()) {
            SEXP chunk = read_bin(con_, "raw", n_bytes_);
            R_xlen_t chunk_len = Rf_xlength(chunk);
            std::copy(RAW(chunk), RAW(chunk) + chunk_len, buf_);
            setg(buf_, buf_, buf_ + chunk_len);
        }
        return gptr() == egptr() ?
            std::char_traits<char>::eof() :
            std::char_traits<char>::to_int_type(*gptr());
    }
};

Here’s a complete header-only definition, e.g., ‘readbinbuf.hpp’

#include <streambuf>
#include <cpp11.hpp>

class readbinbuf : public std::streambuf {
    inline static auto read_bin = cpp11::package("base")["readBin"];
    const cpp11::sexp& con_;
    char *buf_;
    int n_bytes_;

  public:

    readbinbuf(const cpp11::sexp& con, const int n_bytes)
      : con_(con), n_bytes_(n_bytes)
        {
            buf_ = new char[n_bytes_];
        }

    ~readbinbuf() { delete buf_; }

    int underflow() {
        if (gptr() == egptr()) {
            SEXP chunk = read_bin(con_, "raw", n_bytes_);
            R_xlen_t chunk_len = Rf_xlength(chunk);
            std::copy(RAW(chunk), RAW(chunk) + chunk_len, buf_);
            setg(buf_, buf_, buf_ + chunk_len);
        }
        return gptr() == egptr() ?
            std::char_traits<char>::eof() :
            std::char_traits<char>::to_int_type(*gptr());
    }
};

R / C++ interface

The remaining task is to use cpp11 to write an interface between R and C++. We’ll do this with a simple function that counts the number of lines in a connection.

Start by including relevant headers – iostream so that we can use our buffer in an input stream, cpp11/declarations.hpp to so that the C++ function definition is exposed as an R function, and our readbinbuf.hpp header file.

#include <iostream>
#include <cpp11/declarations.hpp>
#include "readbinbuf.h"

Create the interface to be exposed to R: a function taking a connection and number of bytes to read, and returning the number of lines in the connection.

[[cpp11::register]]
int cpp_fun(const cpp11::sexp& con, int n)

In the body of the function, instantiate our readbinbuf class, and initialize an input stream that uses our connection buffer as its source of input.

{
    readbinbuf cbuf(con, n);
    std::istream in(&cbuf);

To count lines, we’ll create a std::string variable to record each line, and an integer counter.

    std::string line;
    int i = 0;

The counting loop uses std::getline() to read a single line from the input stream (which in turn reads data from the buffer and read_bin()).

    while (std::getline(in, line)) {
        i += 1;
    }

The loop continues until there are no more lines available. Return the count.

    return i;
}

The full implementation is

#include <iostream>
#include <cpp11/declarations.hpp>
#include "readbinbuf.h"

[[cpp11::register]]
int cpp_fun(const cpp11::sexp& con, int n)
{
    readbinbuf cbuf(con, n);
    std::istream in(&cbuf);
    std::string line;
    int i = 0;
    while (std::getline(in, line)) {
        i += 1;
    }
    return i;
}

Use

In R, compile the function

cpp11::cpp_source("cpp_fun.cpp")

And then use it, verifying that it is correct via base R methods (the buffer size, 100, is small, so that underflow() is invoked at least 4 times; a much larger buffer, e.g., 2^22, is appropriate for real-world applications).

> url = "https://bioconductor.org/config.yaml"
> length(readLines(url))
[1] 355
> con = url(url, "rb"); cpp_fun(con, 100); close(con)
[1] 355