iceberg-cpp
Loading...
Searching...
No Matches
std_io.h
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#pragma once
21
22#include <cstddef>
23#include <cstdint>
24#include <filesystem>
25#include <fstream>
26#include <ios>
27#include <limits>
28#include <memory>
29#include <optional>
30#include <span>
31#include <string>
32#include <string_view>
33#include <system_error>
34#include <utility>
35
36#include "iceberg/file_io.h"
37#include "iceberg/result.h"
38#include "iceberg/util/macros.h"
39
40namespace iceberg::test {
41
42namespace detail {
43
44inline Result<std::streamsize> ToStreamSize(size_t size) {
45 if (size > static_cast<size_t>(std::numeric_limits<std::streamsize>::max())) {
46 return InvalidArgument("Buffer size {} exceeds streamsize max", size);
47 }
48 return static_cast<std::streamsize>(size);
49}
50
51inline Result<int64_t> ToInt64FileSize(uintmax_t size, std::string_view location) {
52 if (size > static_cast<uintmax_t>(std::numeric_limits<int64_t>::max())) {
53 return Invalid("File size for {} exceeds int64_t max", location);
54 }
55 return static_cast<int64_t>(size);
56}
57
58} // namespace detail
59
61 public:
62 explicit StdSeekableInputStream(std::string location)
63 : location_(std::move(location)), file_(location_, std::ios::binary) {}
64
65 bool is_open() const { return file_.is_open(); }
66
67 Result<int64_t> Position() const override {
68 auto position = file_.tellg();
69 if (position < 0) {
70 return IOError("Failed to get read position for: {}", location_);
71 }
72 return static_cast<int64_t>(position);
73 }
74
75 Status Seek(int64_t position) override {
76 if (position < 0) {
77 return InvalidArgument("Cannot seek to negative position {}", position);
78 }
79 file_.clear();
80 file_.seekg(position);
81 if (!file_) {
82 return IOError("Failed to seek to {} in file: {}", position, location_);
83 }
84 return {};
85 }
86
87 Result<int64_t> Read(std::span<std::byte> out) override {
88 if (out.empty()) {
89 return 0;
90 }
91 ICEBERG_ASSIGN_OR_RAISE(auto read_size, detail::ToStreamSize(out.size()));
92 file_.read(reinterpret_cast<char*>(out.data()), read_size);
93 auto bytes_read = file_.gcount();
94 if (!file_) {
95 if (file_.bad() || !file_.eof()) {
96 return IOError("Failed to read from file: {}", location_);
97 }
98 file_.clear();
99 }
100 if (bytes_read < 0) {
101 return IOError("Failed to read from file: {}", location_);
102 }
103 return static_cast<int64_t>(bytes_read);
104 }
105
106 Status ReadFully(int64_t position, std::span<std::byte> out) override {
107 if (position < 0) {
108 return InvalidArgument("Cannot read from negative position {}", position);
109 }
110 if (out.empty()) {
111 return {};
112 }
113 ICEBERG_ASSIGN_OR_RAISE(auto original_position, Position());
114 auto seek_status = Seek(position);
115 if (!seek_status.has_value()) {
116 static_cast<void>(Seek(original_position));
117 return seek_status;
118 }
119
120 Status read_status = {};
121 size_t total_read = 0;
122 while (total_read < out.size()) {
123 auto read_result = Read(out.subspan(total_read));
124 if (!read_result.has_value()) {
125 read_status = std::unexpected<Error>(read_result.error());
126 break;
127 }
128 if (read_result.value() == 0) {
129 read_status =
130 IOError("Failed to read {} bytes from file: {}", out.size(), location_);
131 break;
132 }
133 total_read += static_cast<size_t>(read_result.value());
134 }
135
136 auto restore_status = Seek(original_position);
137 ICEBERG_RETURN_UNEXPECTED(read_status);
138 return restore_status;
139 }
140
141 Status Close() override {
142 if (!file_.is_open()) {
143 return {};
144 }
145 file_.close();
146 if (!file_) {
147 return IOError("Failed to close file: {}", location_);
148 }
149 return {};
150 }
151
152 private:
153 std::string location_;
154 mutable std::ifstream file_;
155};
156
158 public:
159 explicit StdPositionOutputStream(std::string location)
160 : location_(std::move(location)),
161 file_(location_, std::ios::binary | std::ios::out | std::ios::trunc) {}
162
163 bool is_open() const { return file_.is_open(); }
164
165 Result<int64_t> Position() const override {
166 auto position = file_.tellp();
167 if (position < 0) {
168 return IOError("Failed to get write position for: {}", location_);
169 }
170 return static_cast<int64_t>(position);
171 }
172
173 Status Write(std::span<const std::byte> data) override {
174 if (data.empty()) {
175 return {};
176 }
177 ICEBERG_ASSIGN_OR_RAISE(auto write_size, detail::ToStreamSize(data.size()));
178 file_.write(reinterpret_cast<const char*>(data.data()), write_size);
179 if (!file_) {
180 return IOError("Failed to write to file: {}", location_);
181 }
182 return {};
183 }
184
185 Status Flush() override {
186 file_.flush();
187 if (!file_) {
188 return IOError("Failed to flush file: {}", location_);
189 }
190 return {};
191 }
192
193 Status Close() override {
194 if (!file_.is_open()) {
195 return {};
196 }
197 file_.close();
198 if (!file_) {
199 return IOError("Failed to close file: {}", location_);
200 }
201 return {};
202 }
203
204 private:
205 std::string location_;
206 mutable std::ofstream file_;
207};
208
209class StdInputFile : public InputFile {
210 public:
211 explicit StdInputFile(std::string location,
212 std::optional<int64_t> file_size = std::nullopt)
213 : location_(std::move(location)), file_size_(file_size) {}
214
215 std::string_view location() const override { return location_; }
216
217 Result<int64_t> Size() const override {
218 if (file_size_.has_value()) {
219 return *file_size_;
220 }
221 std::error_code ec;
222 auto size = std::filesystem::file_size(location_, ec);
223 if (ec) {
224 return IOError("Failed to get file size for {}: {}", location_, ec.message());
225 }
226 return detail::ToInt64FileSize(size, location_);
227 }
228
229 Result<std::unique_ptr<SeekableInputStream>> Open() override {
230 auto stream = std::make_unique<StdSeekableInputStream>(location_);
231 if (!stream->is_open()) {
232 return IOError("Failed to open file for reading: {}", location_);
233 }
234 return stream;
235 }
236
237 private:
238 std::string location_;
239 std::optional<int64_t> file_size_;
240};
241
242class StdOutputFile : public OutputFile {
243 public:
244 explicit StdOutputFile(std::string location) : location_(std::move(location)) {}
245
246 std::string_view location() const override { return location_; }
247
248 Result<std::unique_ptr<PositionOutputStream>> Create() override {
249 return Create(/*overwrite=*/false);
250 }
251
252 Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
253 return Create(/*overwrite=*/true);
254 }
255
256 private:
257 Result<std::unique_ptr<PositionOutputStream>> Create(bool overwrite) {
258 std::filesystem::path path(location_);
259 std::error_code ec;
260 auto exists = std::filesystem::exists(path, ec);
261 if (ec) {
262 return IOError("Failed to check file existence for {}: {}", location_,
263 ec.message());
264 }
265 if (!overwrite && exists) {
266 return AlreadyExists("File already exists: {}", location_);
267 }
268 if (path.has_parent_path()) {
269 std::filesystem::create_directories(path.parent_path(), ec);
270 if (ec) {
271 return IOError("Failed to create parent directories for {}: {}", location_,
272 ec.message());
273 }
274 }
275 auto stream = std::make_unique<StdPositionOutputStream>(location_);
276 if (!stream->is_open()) {
277 return IOError("Failed to open file for writing: {}", location_);
278 }
279 return stream;
280 }
281
282 std::string location_;
283};
284
289class StdFileIO : public FileIO {
290 public:
291 Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location) override {
292 return std::make_unique<StdInputFile>(std::move(file_location));
293 }
294
295 Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
296 size_t length) override {
297 if (length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
298 return InvalidArgument("File length {} exceeds int64_t max", length);
299 }
300 return std::make_unique<StdInputFile>(std::move(file_location),
301 static_cast<int64_t>(length));
302 }
303
304 Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location) override {
305 return std::make_unique<StdOutputFile>(std::move(file_location));
306 }
307
308 Status DeleteFile(const std::string& file_location) override {
309 std::error_code ec;
310 if (!std::filesystem::remove(file_location, ec)) {
311 if (ec) {
312 return IOError("Failed to delete file {}: {}", file_location, ec.message());
313 }
314 return IOError("File does not exist: {}", file_location);
315 }
316 return {};
317 }
318};
319
320} // namespace iceberg::test
Pluggable module for reading, writing, and deleting files.
Definition file_io.h:115
Handle for opening a readable file.
Definition file_io.h:79
Handle for creating a writable file.
Definition file_io.h:94
Positioned byte stream for writing file contents.
Definition file_io.h:61
Seekable byte stream for reading file contents.
Definition file_io.h:36
Simple local filesystem FileIO implementation for testing.
Definition std_io.h:289
Result< std::unique_ptr< InputFile > > NewInputFile(std::string file_location) override
Create an input file handle for the given location.
Definition std_io.h:291
Status DeleteFile(const std::string &file_location) override
Delete a file at the given location.
Definition std_io.h:308
Result< std::unique_ptr< OutputFile > > NewOutputFile(std::string file_location) override
Create an output file handle for the given location.
Definition std_io.h:304
Result< std::unique_ptr< InputFile > > NewInputFile(std::string file_location, size_t length) override
Create an input file handle for the given location with a known length.
Definition std_io.h:295
Definition std_io.h:209
Result< int64_t > Size() const override
Return the total file size in bytes.
Definition std_io.h:217
std::string_view location() const override
File location represented by this handle.
Definition std_io.h:215
Result< std::unique_ptr< SeekableInputStream > > Open() override
Open a new independent input stream.
Definition std_io.h:229
Definition std_io.h:242
std::string_view location() const override
File location represented by this handle.
Definition std_io.h:246
Result< std::unique_ptr< PositionOutputStream > > CreateOrOverwrite() override
Create a new output stream, replacing any existing file.
Definition std_io.h:252
Result< std::unique_ptr< PositionOutputStream > > Create() override
Create a new output stream and fail if the file already exists.
Definition std_io.h:248
Status Write(std::span< const std::byte > data) override
Write all bytes in data at the current position.
Definition std_io.h:173
Result< int64_t > Position() const override
Return the current write position.
Definition std_io.h:165
Status Flush() override
Flush buffered data to the underlying store.
Definition std_io.h:185
Status Close() override
Close the stream. Implementations should allow repeated Close calls.
Definition std_io.h:193
Status Close() override
Close the stream. Implementations should allow repeated Close calls.
Definition std_io.h:141
Status Seek(int64_t position) override
Seek to an absolute byte position.
Definition std_io.h:75
Result< int64_t > Read(std::span< std::byte > out) override
Read up to out.size() bytes from the current position.
Definition std_io.h:87
Status ReadFully(int64_t position, std::span< std::byte > out) override
Read exactly out.size() bytes from an absolute position.
Definition std_io.h:106
Result< int64_t > Position() const override
Return the current read position.
Definition std_io.h:67