28#include <unordered_map>
31#include <gtest/gtest.h>
33#include "iceberg/arrow/arrow_io_util.h"
40#include "iceberg/snapshot.h"
42#include "iceberg/table_scan.h"
43#include "iceberg/test/matchers.h"
55 void SetUp()
override {
58 file_io_ = arrow::MakeMockFileIO();
59 schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
64 ICEBERG_UNWRAP_OR_FAIL(
73 return std::format(
"manifest-{}-{}.avro", manifest_counter_++,
74 std::chrono::system_clock::now().time_since_epoch().count());
79 return std::format(
"manifest-list-{}-{}.avro", manifest_list_counter_++,
80 std::chrono::system_clock::now().time_since_epoch().count());
85 int64_t sequence_number, std::shared_ptr<DataFile> file) {
88 .snapshot_id = snapshot_id,
89 .sequence_number = sequence_number,
90 .file_sequence_number = sequence_number,
91 .data_file = std::move(file),
97 int8_t format_version, int64_t snapshot_id, std::vector<ManifestEntry> entries,
101 format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
102 ManifestContent::kData,
103 format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
105 EXPECT_THAT(writer_result, IsOk());
106 auto writer = std::move(writer_result.value());
108 for (
const auto& entry : entries) {
109 EXPECT_THAT(writer->WriteEntry(entry), IsOk());
112 EXPECT_THAT(writer->Close(), IsOk());
113 auto manifest_result = writer->ToManifestFile();
114 EXPECT_THAT(manifest_result, IsOk());
115 return std::move(manifest_result.value());
120 std::vector<ManifestEntry> entries,
121 std::shared_ptr<PartitionSpec> spec) {
125 spec, schema_, ManifestContent::kDeletes);
127 EXPECT_THAT(writer_result, IsOk());
128 auto writer = std::move(writer_result.value());
130 for (
const auto& entry : entries) {
131 EXPECT_THAT(writer->WriteEntry(entry), IsOk());
134 EXPECT_THAT(writer->Close(), IsOk());
135 auto manifest_result = writer->ToManifestFile();
136 EXPECT_THAT(manifest_result, IsOk());
137 return std::move(manifest_result.value());
142 int64_t parent_snapshot_id, int64_t sequence_number,
143 const std::vector<ManifestFile>& manifests) {
147 format_version, snapshot_id, parent_snapshot_id, manifest_list_path, file_io_,
148 format_version >= 2 ? std::optional(sequence_number)
150 format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
152 EXPECT_THAT(writer_result, IsOk());
153 auto writer = std::move(writer_result.value());
154 EXPECT_THAT(writer->AddAll(manifests), IsOk());
155 EXPECT_THAT(writer->Close(), IsOk());
157 return manifest_list_path;
162 const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
163 return tasks | std::views::transform([](
const auto& task) {
164 return task->data_file()->file_path;
166 std::ranges::to<std::vector<std::string>>();
171 const std::vector<std::shared_ptr<Snapshot>>& snapshots,
172 int64_t current_snapshot_id,
173 const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& refs = {},
174 std::shared_ptr<PartitionSpec> default_spec =
nullptr) {
175 TimePointMs timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
176 int64_t last_seq = snapshots.empty() ? 0L : snapshots.back()->sequence_number;
177 auto effective_spec = default_spec ? default_spec : unpartitioned_spec_;
181 .table_uuid =
"test-table-uuid",
182 .location =
"/tmp/table",
183 .last_sequence_number = last_seq,
184 .last_updated_ms = timestamp_ms,
186 .schemas = {schema_},
187 .current_schema_id = schema_->schema_id(),
188 .partition_specs = {partitioned_spec_, unpartitioned_spec_},
189 .default_spec_id = effective_spec->spec_id(),
190 .last_partition_id = 1000,
191 .current_snapshot_id = current_snapshot_id,
192 .snapshots = snapshots,
194 .default_sort_order_id = 0,
201 const std::string& path,
203 std::shared_ptr<PartitionSpec> spec =
nullptr, int64_t record_count = 1) {
204 auto effective_spec = spec ? spec : unpartitioned_spec_;
205 return std::make_shared<DataFile>(
DataFile{
207 .file_format = FileFormatType::kParquet,
208 .partition = std::move(partition),
209 .record_count = record_count,
210 .file_size_in_bytes = 10,
212 .partition_spec_id = effective_spec->spec_id(),
218 int8_t format_version, int64_t snapshot_id,
219 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
220 const std::vector<std::string>& added_files,
221 std::shared_ptr<PartitionSpec> spec =
nullptr) {
222 std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
223 for (
const auto& path : added_files) {
224 files_with_partitions.emplace_back(path, kEmptyPartition);
227 parent_snapshot_id, sequence_number,
228 files_with_partitions, spec);
233 int8_t format_version, int64_t snapshot_id,
234 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
235 const std::vector<std::pair<std::string, PartitionValues>>& added_files,
236 std::shared_ptr<PartitionSpec> spec =
nullptr) {
237 auto effective_spec = spec ? spec : unpartitioned_spec_;
238 std::vector<ManifestEntry> entries;
239 entries.reserve(added_files.size());
240 for (
const auto& [path, partition] : added_files) {
241 auto file =
MakeDataFile(path, partition, effective_spec);
243 MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, file));
246 auto manifest =
WriteDataManifest(format_version, snapshot_id, std::move(entries),
248 int64_t parent_id = parent_snapshot_id.value_or(0L);
250 sequence_number, {manifest});
251 TimePointMs timestamp_ms =
252 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
253 return std::make_shared<Snapshot>(
Snapshot{
255 .parent_snapshot_id = parent_snapshot_id,
256 .sequence_number = sequence_number,
257 .timestamp_ms = timestamp_ms,
258 .manifest_list = manifest_list,
259 .summary = {{
"operation",
"append"}},
260 .schema_id = schema_->schema_id(),
266 int8_t format_version, int64_t snapshot_id,
267 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
268 const std::vector<std::string>& deleted_files) {
269 std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
270 for (
const auto& path : deleted_files) {
271 files_with_partitions.emplace_back(path,
PartitionValues(std::vector<Literal>{}));
274 sequence_number, files_with_partitions);
279 int8_t format_version, int64_t snapshot_id,
280 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
281 const std::vector<std::pair<std::string, PartitionValues>>& deleted_files) {
282 std::vector<ManifestEntry> entries;
283 entries.reserve(deleted_files.size());
284 for (
const auto& [path, partition] : deleted_files) {
287 MakeEntry(ManifestStatus::kDeleted, snapshot_id, sequence_number, file));
290 auto manifest =
WriteDataManifest(format_version, snapshot_id, std::move(entries));
291 int64_t parent_id = parent_snapshot_id.value_or(0L);
293 sequence_number, {manifest});
294 TimePointMs timestamp_ms =
295 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
296 return std::make_shared<Snapshot>(
Snapshot{
298 .parent_snapshot_id = parent_snapshot_id,
299 .sequence_number = sequence_number,
300 .timestamp_ms = timestamp_ms,
301 .manifest_list = manifest_list,
302 .summary = {{
"operation",
"delete"}},
303 .schema_id = schema_->schema_id(),
309 int8_t format_version, int64_t snapshot_id,
310 std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
311 const std::vector<std::string>& added_file_paths,
312 const std::vector<std::string>& deleted_file_paths) {
313 std::vector<ManifestEntry> entries;
314 entries.reserve(added_file_paths.size() + deleted_file_paths.size());
316 for (
const auto& path : added_file_paths) {
319 MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, file));
322 for (
const auto& path : deleted_file_paths) {
325 MakeEntry(ManifestStatus::kDeleted, snapshot_id, sequence_number, file));
328 auto manifest =
WriteDataManifest(format_version, snapshot_id, std::move(entries));
329 int64_t parent_id = parent_snapshot_id.value_or(0L);
331 sequence_number, {manifest});
332 TimePointMs timestamp_ms =
333 TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
334 return std::make_shared<Snapshot>(
Snapshot{
336 .parent_snapshot_id = parent_snapshot_id,
337 .sequence_number = sequence_number,
338 .timestamp_ms = timestamp_ms,
339 .manifest_list = manifest_list,
340 .summary = {{
"operation",
"overwrite"}},
341 .schema_id = schema_->schema_id(),
345 std::shared_ptr<FileIO> file_io_;
346 std::shared_ptr<Schema> schema_;
347 std::shared_ptr<PartitionSpec> partitioned_spec_;
348 std::shared_ptr<PartitionSpec> unpartitioned_spec_;
351 int manifest_counter_ = 0;
352 int manifest_list_counter_ = 0;
Provide functions to register Avro implementations.
static Result< std::unique_ptr< ManifestListWriter > > MakeWriter(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, std::string_view manifest_list_location, std::shared_ptr< FileIO > file_io, std::optional< int64_t > sequence_number=std::nullopt, std::optional< int64_t > first_row_id=std::nullopt)
Factory function to create a writer for the manifest list based on format version.
Definition manifest_writer.cc:355
static Result< std::unique_ptr< ManifestWriter > > MakeWriter(int8_t format_version, std::optional< int64_t > snapshot_id, std::string_view manifest_location, std::shared_ptr< FileIO > file_io, std::shared_ptr< PartitionSpec > partition_spec, std::shared_ptr< Schema > current_schema, ManifestContent content=ManifestContent::kData, std::optional< int64_t > first_row_id=std::nullopt)
Factory function to create a writer for a manifest file based on format version.
Definition manifest_writer.cc:274
a field with its transform.
Definition partition_field.h:37
static const std::shared_ptr< PartitionSpec > & Unpartitioned()
Get an unsorted partition spec singleton.
Definition partition_spec.cc:59
static Result< std::unique_ptr< PartitionSpec > > Make(const Schema &schema, int32_t spec_id, std::vector< PartitionField > fields, bool allow_missing_fields, std::optional< int32_t > last_assigned_field_id=std::nullopt)
Create a PartitionSpec binding to a schema.
Definition partition_spec.cc:260
StructLike wrapper for a vector of literals that represent partition values.
Definition partition_values.h:36
Base class for scan-related tests providing common test utilities.
Definition scan_test_base.h:53
std::shared_ptr< TableMetadata > MakeTableMetadata(const std::vector< std::shared_ptr< Snapshot > > &snapshots, int64_t current_snapshot_id, const std::unordered_map< std::string, std::shared_ptr< SnapshotRef > > &refs={}, std::shared_ptr< PartitionSpec > default_spec=nullptr)
Create table metadata with the given snapshots.
Definition scan_test_base.h:170
std::shared_ptr< Snapshot > MakeDeleteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::pair< std::string, PartitionValues > > &deleted_files)
Create a delete snapshot with partition values for each file.
Definition scan_test_base.h:278
std::string MakeManifestPath()
Generate a unique manifest file path.
Definition scan_test_base.h:72
std::string MakeManifestListPath()
Generate a unique manifest list file path.
Definition scan_test_base.h:78
ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id, int64_t sequence_number, std::shared_ptr< DataFile > file)
Create a manifest entry.
Definition scan_test_base.h:84
ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id, std::vector< ManifestEntry > entries, std::shared_ptr< PartitionSpec > spec)
Write a delete manifest file.
Definition scan_test_base.h:119
std::shared_ptr< Snapshot > MakeAppendSnapshotWithPartitionValues(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::pair< std::string, PartitionValues > > &added_files, std::shared_ptr< PartitionSpec > spec=nullptr)
Create an append snapshot with the given files (with partition values).
Definition scan_test_base.h:232
std::shared_ptr< Snapshot > MakeAppendSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &added_files, std::shared_ptr< PartitionSpec > spec=nullptr)
Create an append snapshot with the given files (string paths).
Definition scan_test_base.h:217
std::shared_ptr< Snapshot > MakeOverwriteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &added_file_paths, const std::vector< std::string > &deleted_file_paths)
Create an overwrite snapshot with added and deleted files.
Definition scan_test_base.h:308
static std::vector< std::string > GetPaths(const std::vector< std::shared_ptr< FileScanTask > > &tasks)
Extract file paths from scan tasks.
Definition scan_test_base.h:161
ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id, std::vector< ManifestEntry > entries, std::shared_ptr< PartitionSpec > spec=PartitionSpec::Unpartitioned())
Write a data manifest file.
Definition scan_test_base.h:96
std::shared_ptr< DataFile > MakeDataFile(const std::string &path, PartitionValues partition=PartitionValues(std::vector< Literal >{}), std::shared_ptr< PartitionSpec > spec=nullptr, int64_t record_count=1)
Create a data file with optional partition values.
Definition scan_test_base.h:200
std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, int64_t sequence_number, const std::vector< ManifestFile > &manifests)
Write a manifest list file.
Definition scan_test_base.h:141
std::shared_ptr< Snapshot > MakeDeleteSnapshot(int8_t format_version, int64_t snapshot_id, std::optional< int64_t > parent_snapshot_id, int64_t sequence_number, const std::vector< std::string > &deleted_files)
Create a delete snapshot with the given files.
Definition scan_test_base.h:265
static SchemaField MakeRequired(int32_t field_id, std::string_view name, std::shared_ptr< Type > type, std::string_view doc={})
Construct a required (non-null) field.
Definition schema_field.cc:43
ICEBERG_EXPORT const std::shared_ptr< IntType > & int32()
Return an IntType instance.
DataFile carries data file path, partition tuple, metrics, ...
Definition manifest_entry.h:62
std::string file_path
Definition manifest_entry.h:76
A manifest is an immutable Avro file that lists data files or delete files, along with each file's pa...
Definition manifest_entry.h:307
ManifestStatus status
Definition manifest_entry.h:311
Entry in a manifest list.
Definition manifest_list.h:85
A snapshot of the data in a table at a point in time.
Definition snapshot.h:389
int64_t snapshot_id
A unique long ID.
Definition snapshot.h:391