iceberg-cpp
Loading...
Searching...
No Matches
scan_test_base.h
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#pragma once
21
22#include <chrono>
23#include <format>
24#include <memory>
25#include <optional>
26#include <ranges>
27#include <string>
28#include <unordered_map>
29#include <vector>
30
31#include <gtest/gtest.h>
32
33#include "iceberg/arrow/arrow_io_util.h"
39#include "iceberg/schema.h"
40#include "iceberg/snapshot.h"
42#include "iceberg/table_scan.h"
43#include "iceberg/test/matchers.h"
44#include "iceberg/transform.h"
45#include "iceberg/type.h"
46
47namespace iceberg {
48
53class ScanTestBase : public testing::TestWithParam<int8_t> {
54 protected:
55 void SetUp() override {
56 avro::RegisterAll();
57
58 file_io_ = arrow::MakeMockFileIO();
59 schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
60 SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
61 SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
62 unpartitioned_spec_ = PartitionSpec::Unpartitioned();
63
64 ICEBERG_UNWRAP_OR_FAIL(
65 partitioned_spec_,
67 /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
68 "data_bucket_16_2", Transform::Bucket(16))}));
69 }
70
72 std::string MakeManifestPath() {
73 return std::format("manifest-{}-{}.avro", manifest_counter_++,
74 std::chrono::system_clock::now().time_since_epoch().count());
75 }
76
78 std::string MakeManifestListPath() {
79 return std::format("manifest-list-{}-{}.avro", manifest_list_counter_++,
80 std::chrono::system_clock::now().time_since_epoch().count());
81 }
82
84 ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
85 int64_t sequence_number, std::shared_ptr<DataFile> file) {
86 return ManifestEntry{
87 .status = status,
88 .snapshot_id = snapshot_id,
89 .sequence_number = sequence_number,
90 .file_sequence_number = sequence_number,
91 .data_file = std::move(file),
92 };
93 }
94
97 int8_t format_version, int64_t snapshot_id, std::vector<ManifestEntry> entries,
98 std::shared_ptr<PartitionSpec> spec = PartitionSpec::Unpartitioned()) {
99 const std::string manifest_path = MakeManifestPath();
100 auto writer_result = ManifestWriter::MakeWriter(
101 format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
102 ManifestContent::kData,
103 /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
104
105 EXPECT_THAT(writer_result, IsOk());
106 auto writer = std::move(writer_result.value());
107
108 for (const auto& entry : entries) {
109 EXPECT_THAT(writer->WriteEntry(entry), IsOk());
110 }
111
112 EXPECT_THAT(writer->Close(), IsOk());
113 auto manifest_result = writer->ToManifestFile();
114 EXPECT_THAT(manifest_result, IsOk());
115 return std::move(manifest_result.value());
116 }
117
119 ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
120 std::vector<ManifestEntry> entries,
121 std::shared_ptr<PartitionSpec> spec) {
122 const std::string manifest_path = MakeManifestPath();
123 auto writer_result =
124 ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
125 spec, schema_, ManifestContent::kDeletes);
126
127 EXPECT_THAT(writer_result, IsOk());
128 auto writer = std::move(writer_result.value());
129
130 for (const auto& entry : entries) {
131 EXPECT_THAT(writer->WriteEntry(entry), IsOk());
132 }
133
134 EXPECT_THAT(writer->Close(), IsOk());
135 auto manifest_result = writer->ToManifestFile();
136 EXPECT_THAT(manifest_result, IsOk());
137 return std::move(manifest_result.value());
138 }
139
141 std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
142 int64_t parent_snapshot_id, int64_t sequence_number,
143 const std::vector<ManifestFile>& manifests) {
144 const std::string manifest_list_path = MakeManifestListPath();
145
146 auto writer_result = ManifestListWriter::MakeWriter(
147 format_version, snapshot_id, parent_snapshot_id, manifest_list_path, file_io_,
148 /*sequence_number=*/format_version >= 2 ? std::optional(sequence_number)
149 : std::nullopt,
150 /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
151
152 EXPECT_THAT(writer_result, IsOk());
153 auto writer = std::move(writer_result.value());
154 EXPECT_THAT(writer->AddAll(manifests), IsOk());
155 EXPECT_THAT(writer->Close(), IsOk());
156
157 return manifest_list_path;
158 }
159
161 static std::vector<std::string> GetPaths(
162 const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
163 return tasks | std::views::transform([](const auto& task) {
164 return task->data_file()->file_path;
165 }) |
166 std::ranges::to<std::vector<std::string>>();
167 }
168
170 std::shared_ptr<TableMetadata> MakeTableMetadata(
171 const std::vector<std::shared_ptr<Snapshot>>& snapshots,
172 int64_t current_snapshot_id,
173 const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& refs = {},
174 std::shared_ptr<PartitionSpec> default_spec = nullptr) {
175 TimePointMs timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
176 int64_t last_seq = snapshots.empty() ? 0L : snapshots.back()->sequence_number;
177 auto effective_spec = default_spec ? default_spec : unpartitioned_spec_;
178
179 return std::make_shared<TableMetadata>(TableMetadata{
180 .format_version = GetParam(),
181 .table_uuid = "test-table-uuid",
182 .location = "/tmp/table",
183 .last_sequence_number = last_seq,
184 .last_updated_ms = timestamp_ms,
185 .last_column_id = 2,
186 .schemas = {schema_},
187 .current_schema_id = schema_->schema_id(),
188 .partition_specs = {partitioned_spec_, unpartitioned_spec_},
189 .default_spec_id = effective_spec->spec_id(),
190 .last_partition_id = 1000,
191 .current_snapshot_id = current_snapshot_id,
192 .snapshots = snapshots,
193 .snapshot_log = {},
194 .default_sort_order_id = 0,
195 .refs = refs,
196 });
197 }
198
200 std::shared_ptr<DataFile> MakeDataFile(
201 const std::string& path,
202 PartitionValues partition = PartitionValues(std::vector<Literal>{}),
203 std::shared_ptr<PartitionSpec> spec = nullptr, int64_t record_count = 1) {
204 auto effective_spec = spec ? spec : unpartitioned_spec_;
205 return std::make_shared<DataFile>(DataFile{
206 .file_path = path,
207 .file_format = FileFormatType::kParquet,
208 .partition = std::move(partition),
209 .record_count = record_count,
210 .file_size_in_bytes = 10,
211 .sort_order_id = 0,
212 .partition_spec_id = effective_spec->spec_id(),
213 });
214 }
215
217 std::shared_ptr<Snapshot> MakeAppendSnapshot(
218 int8_t format_version, int64_t snapshot_id,
219 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
220 const std::vector<std::string>& added_files,
221 std::shared_ptr<PartitionSpec> spec = nullptr) {
222 std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
223 for (const auto& path : added_files) {
224 files_with_partitions.emplace_back(path, kEmptyPartition);
225 }
226 return MakeAppendSnapshotWithPartitionValues(format_version, snapshot_id,
227 parent_snapshot_id, sequence_number,
228 files_with_partitions, spec);
229 }
230
232 std::shared_ptr<Snapshot> MakeAppendSnapshotWithPartitionValues(
233 int8_t format_version, int64_t snapshot_id,
234 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
235 const std::vector<std::pair<std::string, PartitionValues>>& added_files,
236 std::shared_ptr<PartitionSpec> spec = nullptr) {
237 auto effective_spec = spec ? spec : unpartitioned_spec_;
238 std::vector<ManifestEntry> entries;
239 entries.reserve(added_files.size());
240 for (const auto& [path, partition] : added_files) {
241 auto file = MakeDataFile(path, partition, effective_spec);
242 entries.push_back(
243 MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, file));
244 }
245
246 auto manifest = WriteDataManifest(format_version, snapshot_id, std::move(entries),
247 effective_spec);
248 int64_t parent_id = parent_snapshot_id.value_or(0L);
249 auto manifest_list = WriteManifestList(format_version, snapshot_id, parent_id,
250 sequence_number, {manifest});
251 TimePointMs timestamp_ms =
252 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
253 return std::make_shared<Snapshot>(Snapshot{
254 .snapshot_id = snapshot_id,
255 .parent_snapshot_id = parent_snapshot_id,
256 .sequence_number = sequence_number,
257 .timestamp_ms = timestamp_ms,
258 .manifest_list = manifest_list,
259 .summary = {{"operation", "append"}},
260 .schema_id = schema_->schema_id(),
261 });
262 }
263
265 std::shared_ptr<Snapshot> MakeDeleteSnapshot(
266 int8_t format_version, int64_t snapshot_id,
267 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
268 const std::vector<std::string>& deleted_files) {
269 std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
270 for (const auto& path : deleted_files) {
271 files_with_partitions.emplace_back(path, PartitionValues(std::vector<Literal>{}));
272 }
273 return MakeDeleteSnapshot(format_version, snapshot_id, parent_snapshot_id,
274 sequence_number, files_with_partitions);
275 }
276
278 std::shared_ptr<Snapshot> MakeDeleteSnapshot(
279 int8_t format_version, int64_t snapshot_id,
280 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
281 const std::vector<std::pair<std::string, PartitionValues>>& deleted_files) {
282 std::vector<ManifestEntry> entries;
283 entries.reserve(deleted_files.size());
284 for (const auto& [path, partition] : deleted_files) {
285 auto file = MakeDataFile(path, partition);
286 entries.push_back(
287 MakeEntry(ManifestStatus::kDeleted, snapshot_id, sequence_number, file));
288 }
289
290 auto manifest = WriteDataManifest(format_version, snapshot_id, std::move(entries));
291 int64_t parent_id = parent_snapshot_id.value_or(0L);
292 auto manifest_list = WriteManifestList(format_version, snapshot_id, parent_id,
293 sequence_number, {manifest});
294 TimePointMs timestamp_ms =
295 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
296 return std::make_shared<Snapshot>(Snapshot{
297 .snapshot_id = snapshot_id,
298 .parent_snapshot_id = parent_snapshot_id,
299 .sequence_number = sequence_number,
300 .timestamp_ms = timestamp_ms,
301 .manifest_list = manifest_list,
302 .summary = {{"operation", "delete"}},
303 .schema_id = schema_->schema_id(),
304 });
305 }
306
308 std::shared_ptr<Snapshot> MakeOverwriteSnapshot(
309 int8_t format_version, int64_t snapshot_id,
310 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
311 const std::vector<std::string>& added_file_paths,
312 const std::vector<std::string>& deleted_file_paths) {
313 std::vector<ManifestEntry> entries;
314 entries.reserve(added_file_paths.size() + deleted_file_paths.size());
315
316 for (const auto& path : added_file_paths) {
317 auto file = MakeDataFile(path);
318 entries.push_back(
319 MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, file));
320 }
321
322 for (const auto& path : deleted_file_paths) {
323 auto file = MakeDataFile(path);
324 entries.push_back(
325 MakeEntry(ManifestStatus::kDeleted, snapshot_id, sequence_number, file));
326 }
327
328 auto manifest = WriteDataManifest(format_version, snapshot_id, std::move(entries));
329 int64_t parent_id = parent_snapshot_id.value_or(0L);
330 auto manifest_list = WriteManifestList(format_version, snapshot_id, parent_id,
331 sequence_number, {manifest});
332 TimePointMs timestamp_ms =
333 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
334 return std::make_shared<Snapshot>(Snapshot{
335 .snapshot_id = snapshot_id,
336 .parent_snapshot_id = parent_snapshot_id,
337 .sequence_number = sequence_number,
338 .timestamp_ms = timestamp_ms,
339 .manifest_list = manifest_list,
340 .summary = {{"operation", "overwrite"}},
341 .schema_id = schema_->schema_id(),
342 });
343 }
344
345 std::shared_ptr<FileIO> file_io_;
346 std::shared_ptr<Schema> schema_;
347 std::shared_ptr<PartitionSpec> partitioned_spec_;
348 std::shared_ptr<PartitionSpec> unpartitioned_spec_;
349
350 private:
351 int manifest_counter_ = 0;
352 int manifest_list_counter_ = 0;
353 constexpr static PartitionValues kEmptyPartition{};
354};
355
356} // namespace iceberg
Provide functions to register Avro implementations.
static Result< std::unique_ptr< ManifestListWriter > > MakeWriter(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, std::string_view manifest_list_location, std::shared_ptr< FileIO > file_io, std::optional< int64_t > sequence_number=std::nullopt, std::optional< int64_t > first_row_id=std::nullopt)
Factory function to create a writer for the manifest list based on format version.
Definition manifest_writer.cc:355
static Result< std::unique_ptr< ManifestWriter > > MakeWriter(int8_t format_version, std::optional< int64_t > snapshot_id, std::string_view manifest_location, std::shared_ptr< FileIO > file_io, std::shared_ptr< PartitionSpec > partition_spec, std::shared_ptr< Schema > current_schema, ManifestContent content=ManifestContent::kData, std::optional< int64_t > first_row_id=std::nullopt)
Factory function to create a writer for a manifest file based on format version.
Definition manifest_writer.cc:274
a field with its transform.
Definition partition_field.h:37
static const std::shared_ptr< PartitionSpec > & Unpartitioned()
Get an unsorted partition spec singleton.
Definition partition_spec.cc:59
static Result< std::unique_ptr< PartitionSpec > > Make(const Schema &schema, int32_t spec_id, std::vector< PartitionField > fields, bool allow_missing_fields, std::optional< int32_t > last_assigned_field_id=std::nullopt)
Create a PartitionSpec binding to a schema.
Definition partition_spec.cc:260
StructLike wrapper for a vector of literals that represent partition values.
Definition partition_values.h:36
Base class for scan-related tests providing common test utilities.
Definition scan_test_base.h:53
std::shared_ptr< TableMetadata > MakeTableMetadata(const std::vector< std::shared_ptr< Snapshot > > &snapshots, int64_t current_snapshot_id, const std::unordered_map< std::string, std::shared_ptr< SnapshotRef > > &refs={}, std::shared_ptr< PartitionSpec > default_spec=nullptr)
Create table metadata with the given snapshots.
Definition scan_test_base.h:170
std::shared_ptr< Snapshot > MakeDeleteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::pair< std::string, PartitionValues > > &deleted_files)
Create a delete snapshot with partition values for each file.
Definition scan_test_base.h:278
std::string MakeManifestPath()
Generate a unique manifest file path.
Definition scan_test_base.h:72
std::string MakeManifestListPath()
Generate a unique manifest list file path.
Definition scan_test_base.h:78
ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id, int64_t sequence_number, std::shared_ptr< DataFile > file)
Create a manifest entry.
Definition scan_test_base.h:84
ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id, std::vector< ManifestEntry > entries, std::shared_ptr< PartitionSpec > spec)
Write a delete manifest file.
Definition scan_test_base.h:119
std::shared_ptr< Snapshot > MakeAppendSnapshotWithPartitionValues(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::pair< std::string, PartitionValues > > &added_files, std::shared_ptr< PartitionSpec > spec=nullptr)
Create an append snapshot with the given files (with partition values).
Definition scan_test_base.h:232
std::shared_ptr< Snapshot > MakeAppendSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &added_files, std::shared_ptr< PartitionSpec > spec=nullptr)
Create an append snapshot with the given files (string paths).
Definition scan_test_base.h:217
std::shared_ptr< Snapshot > MakeOverwriteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &added_file_paths, const std::vector< std::string > &deleted_file_paths)
Create an overwrite snapshot with added and deleted files.
Definition scan_test_base.h:308
static std::vector< std::string > GetPaths(const std::vector< std::shared_ptr< FileScanTask > > &tasks)
Extract file paths from scan tasks.
Definition scan_test_base.h:161
ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id, std::vector< ManifestEntry > entries, std::shared_ptr< PartitionSpec > spec=PartitionSpec::Unpartitioned())
Write a data manifest file.
Definition scan_test_base.h:96
std::shared_ptr< DataFile > MakeDataFile(const std::string &path, PartitionValues partition=PartitionValues(std::vector< Literal >{}), std::shared_ptr< PartitionSpec > spec=nullptr, int64_t record_count=1)
Create a data file with optional partition values.
Definition scan_test_base.h:200
std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, int64_t sequence_number, const std::vector< ManifestFile > &manifests)
Write a manifest list file.
Definition scan_test_base.h:141
std::shared_ptr< Snapshot > MakeDeleteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &deleted_files)
Create a delete snapshot with the given files.
Definition scan_test_base.h:265
static SchemaField MakeRequired(int32_t field_id, std::string_view name, std::shared_ptr< Type > type, std::string_view doc={})
Construct a required (non-null) field.
Definition schema_field.cc:43
static std::shared_ptr< Transform > Bucket(int32_t num_buckets)
Creates a shared instance of the Bucket transform.
Definition transform.cc:81
ICEBERG_EXPORT const std::shared_ptr< IntType > & int32()
Return an IntType instance.
DataFile carries data file path, partition tuple, metrics, ...
Definition manifest_entry.h:62
std::string file_path
Definition manifest_entry.h:76
A manifest is an immutable Avro file that lists data files or delete files, along with each file's pa...
Definition manifest_entry.h:307
ManifestStatus status
Definition manifest_entry.h:311
Entry in a manifest list.
Definition manifest_list.h:85
A snapshot of the data in a table at a point in time.
Definition snapshot.h:389
int64_t snapshot_id
A unique long ID.
Definition snapshot.h:391
Represents the metadata for an Iceberg table.
Definition table_metadata.h:72
int8_t format_version
An integer version number for the format.
Definition table_metadata.h:83