Arrow Data Interchange

Namespace: dftracer::utils::utilities::common::arrow

For usage guide and examples, see Arrow Data Infrastructure.

class ArrowExportResult

Move-only RAII container for a finished Arrow record batch.

Holds ownership of both the ArrowArray and ArrowSchema produced by RecordBatchBuilder::finish(). Safe to move across threads/channels.

Public Functions

ArrowExportResult() = default
inline ArrowExportResult(nanoarrow::UniqueSchema schema, nanoarrow::UniqueArray array)
ArrowExportResult(ArrowExportResult&&) = default
ArrowExportResult &operator=(ArrowExportResult&&) = default
ArrowExportResult(const ArrowExportResult&) = delete
ArrowExportResult &operator=(const ArrowExportResult&) = delete
inline ArrowArray *get_array() noexcept
inline const ArrowArray *get_array() const noexcept
inline ArrowSchema *get_schema() noexcept
inline const ArrowSchema *get_schema() const noexcept
inline nanoarrow::UniqueArray release_array()
inline nanoarrow::UniqueSchema release_schema()
inline int64_t num_rows() const noexcept
inline int64_t num_columns() const noexcept
inline bool valid() const noexcept
struct ArrowFileReadResult

Result from reading a single Arrow IPC file. Batches are stored in shared_ptr to allow copying through std::shared_future.

Public Functions

inline ArrowFileReadResult()

Public Members

std::string path
std::shared_ptr<std::vector<ArrowExportResult>> batches
std::int64_t total_rows = 0
std::string error
bool success = true
class BufferPool

Public Functions

explicit BufferPool(std::size_t num_slots = 4, std::size_t initial_capacity = DEFAULT_BUFFER_CAPACITY)
~BufferPool() = default
BufferPool(const BufferPool&) = delete
BufferPool &operator=(const BufferPool&) = delete
BufferPool(BufferPool&&) = default
BufferPool &operator=(BufferPool&&) = default
Slot *acquire(std::size_t min_capacity = 0)
void release(Slot *slot)
inline std::size_t size() const

Public Static Attributes

static constexpr std::size_t DEFAULT_BUFFER_CAPACITY = 4 * 1024 * 1024
struct Slot

Public Members

std::vector<uint8_t> data
std::atomic<bool> in_use = {false}
struct ColumnData

Public Members

std::string name
ColumnType type
std::vector<std::int64_t> int64_values
std::vector<std::uint64_t> uint64_values
std::vector<double> double_values
std::vector<std::int32_t> string_offsets
std::vector<char> string_data
std::vector<std::uint8_t> bool_values
std::vector<std::uint8_t> validity
std::size_t count = 0
bool has_nulls = false
std::vector<std::int32_t> dict_indices
std::deque<std::string> dict_values
std::unordered_map<std::string_view, std::int32_t> dict_map
struct ColumnSpec

Public Members

std::string name
ColumnType type
class IpcReader

RAII reader for Arrow IPC file format (.arrow).

Optimized with:

  • Memory-mapped I/O for zero-copy file access

  • Shared schema (no deep copy per batch)

  • Buffer reuse for decompression

Supports buffer-level ZSTD decompression compatible with pyarrow, polars, and this library’s IpcWriter.

Sequence: open() -> num_batches() -> read_batch(i) [or read_all()]

Move-only. Not thread-safe.

Public Functions

IpcReader() = default
~IpcReader()
IpcReader(const IpcReader&) = delete
IpcReader &operator=(const IpcReader&) = delete
IpcReader(IpcReader &&other) noexcept
IpcReader &operator=(IpcReader &&other) noexcept
int open(const std::string &path)
void close()
inline bool is_open() const noexcept
inline std::size_t num_batches() const noexcept
inline std::int64_t total_rows() const noexcept
ArrowExportResult read_batch(std::size_t index)
std::vector<ArrowExportResult> read_all()
int for_each_batch(std::function<int(ArrowExportResult&)> callback)
class IpcWriter

Async Arrow IPC file writer (.arrow).

Uses Executor::current() for async I/O - must be called from within executor. Supports buffer-level compression (zstd) compatible with pyarrow, polars, nanoarrow, and other Arrow IPC readers.

Usage: open() -> write_batch() [1..N] -> close()

Move-only. Not thread-safe.

Public Functions

IpcWriter() = default
~IpcWriter()
IpcWriter(const IpcWriter&) = delete
IpcWriter &operator=(const IpcWriter&) = delete
IpcWriter(IpcWriter &&other) noexcept
IpcWriter &operator=(IpcWriter &&other) noexcept
coro::CoroTask<int> open(const std::string &path, IpcCompression compression = DEFAULT_ARROW_IPC_COMPRESSION, std::size_t pool_slots = 4)
coro::CoroTask<int> write_batch(ArrowExportResult &batch)
coro::CoroTask<int> write_batches(std::vector<ArrowExportResult> &batches)
coro::CoroTask<int> close()
inline bool is_open() const noexcept
struct ParallelReadResult

Result from reading multiple Arrow IPC files in parallel.

Public Members

std::vector<ArrowFileReadResult> file_results
std::int64_t total_rows = 0
std::int64_t total_batches = 0
std::size_t files_read = 0
std::size_t files_failed = 0
struct PartitionConfig

Public Types

enum class Mode

Values:

enumerator NONE
enumerator COLUMN
enumerator BUCKETED
enumerator VIEW

Public Members

Mode mode = Mode::NONE
std::vector<std::string> partition_columns
int num_buckets = 0
std::vector<std::pair<std::string, std::optional<std::string>>> views
class PartitionRouter

Routes Arrow record batches to partitioned output directories. Supports column-based, bucketed, and view-based partitioning.

Public Functions

PartitionRouter() = default
~PartitionRouter()
PartitionRouter(const PartitionRouter&) = delete
PartitionRouter &operator=(const PartitionRouter&) = delete
PartitionRouter(PartitionRouter &&other) noexcept
PartitionRouter &operator=(PartitionRouter &&other) noexcept
int open(const std::string &output_dir, const PartitionConfig &config, int64_t chunk_size_bytes, IpcCompression compression = DEFAULT_ARROW_IPC_COMPRESSION)
void register_predicate(const std::string &view_name, PredicateEvaluator evaluator)
coro::CoroTask<int> write_batch(ArrowExportResult &batch)
coro::CoroTask<RouterWriteStats> close()
inline bool is_open() const noexcept
struct PartitionWriteStats

Public Members

std::vector<std::string> files
std::vector<int64_t> row_counts
int64_t total_rows = 0
int64_t total_uncompressed_bytes = 0
class PartitionWriter

Async wrapper around IpcWriter with automatic file rotation. Writes part-NNNNN.arrow files, rotating when size threshold is exceeded.

Public Functions

PartitionWriter() = default
~PartitionWriter()
PartitionWriter(const PartitionWriter&) = delete
PartitionWriter &operator=(const PartitionWriter&) = delete
PartitionWriter(PartitionWriter &&other) noexcept
PartitionWriter &operator=(PartitionWriter &&other) noexcept
coro::CoroTask<int> open(const std::string &output_dir, int64_t chunk_size_bytes, IpcCompression compression = DEFAULT_ARROW_IPC_COMPRESSION)
coro::CoroTask<int> write_batch(ArrowExportResult &batch)
coro::CoroTask<PartitionWriteStats> close()
inline bool is_open() const noexcept
inline int64_t current_file_bytes() const noexcept
inline int64_t total_bytes() const noexcept
inline int64_t total_rows() const noexcept
inline size_t file_count() const noexcept
class RecordBatchBuilder

Type-safe columnar builder producing Arrow record batches via nanoarrow.

Two modes: Static: declare_schema() once, then append rows. Fastest path. Dynamic: add_or_get_column() on first encounter; backfills nulls for columns not touched in a given row via end_row().

String columns copy and own their data — no lifetime requirements on the source strings passed to append_string().

NOT thread-safe. One builder per worker/coroutine.

Public Functions

RecordBatchBuilder() = default
void declare_schema(std::initializer_list<ColumnSpec> specs)
void declare_schema(const std::vector<ColumnSpec> &specs)
std::size_t add_or_get_column(std::string_view name, ColumnType type)
std::optional<std::size_t> find_column(std::string_view name) const
ColumnType column_type(std::size_t col_idx) const noexcept
void append_int64(std::size_t col_idx, std::int64_t value)
void append_uint64(std::size_t col_idx, std::uint64_t value)
void append_double(std::size_t col_idx, double value)
void append_string(std::size_t col_idx, std::string_view value)
void append_dict_string(std::size_t col_idx, std::string_view value)
void append_bool(std::size_t col_idx, bool value)
void append_null(std::size_t col_idx)
void end_row()
void reserve(std::size_t num_rows)
ArrowExportResult finish()
void reset(bool keep_schema = true)
inline void lock_schema() noexcept
inline bool is_schema_locked() const noexcept
inline std::size_t num_rows() const noexcept
inline std::size_t num_columns() const noexcept
struct RouterWriteStats

Public Members

std::unordered_map<std::string, PartitionWriteStats> partitions
int64_t total_rows = 0
int64_t total_uncompressed_bytes = 0