iceberg-cpp
Loading...
Searching...
No Matches
table_scan.h
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#pragma once
21
22#include <functional>
23#include <memory>
24#include <optional>
25#include <string>
26#include <unordered_map>
27#include <unordered_set>
28#include <vector>
29
31#include "iceberg/iceberg_export.h"
32#include "iceberg/result.h"
34#include "iceberg/type_fwd.h"
36
37namespace iceberg {
38
40class ICEBERG_EXPORT ScanTask {
41 public:
42 enum class Kind : uint8_t {
43 kFileScanTask,
44 kChangelogScanTask,
45 };
46
48 virtual Kind kind() const = 0;
49
51 virtual int64_t size_bytes() const = 0;
52
54 virtual int32_t files_count() const = 0;
55
57 virtual int64_t estimated_row_count() const = 0;
58
59 virtual ~ScanTask();
60};
61
63class ICEBERG_EXPORT FileScanTask : public ScanTask {
64 public:
70 explicit FileScanTask(std::shared_ptr<DataFile> data_file,
71 std::vector<std::shared_ptr<DataFile>> delete_files = {},
72 std::shared_ptr<Expression> filter = nullptr);
73
75 const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
76
78 const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
79 return delete_files_;
80 }
81
83 const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }
84
85 Kind kind() const override { return Kind::kFileScanTask; }
86 int64_t size_bytes() const override;
87 int32_t files_count() const override;
88 int64_t estimated_row_count() const override;
89
97 Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
98 std::shared_ptr<Schema> projected_schema) const;
99
100 private:
101 std::shared_ptr<DataFile> data_file_;
102 std::vector<std::shared_ptr<DataFile>> delete_files_;
103 std::shared_ptr<Expression> residual_filter_;
104};
105
106enum class ChangelogOperation : uint8_t {
107 kInsert,
108 kDelete,
109 kUpdateBefore,
110 kUpdateAfter,
111};
112
114class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
115 public:
123 ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
124 std::shared_ptr<DataFile> data_file,
125 std::vector<std::shared_ptr<DataFile>> delete_files = {},
126 std::shared_ptr<Expression> residual_filter = nullptr)
127 : change_ordinal_(change_ordinal),
128 commit_snapshot_id_(commit_snapshot_id),
129 data_file_(std::move(data_file)),
130 delete_files_(std::move(delete_files)),
131 residual_filter_(std::move(residual_filter)) {}
132
133 Kind kind() const override { return Kind::kChangelogScanTask; }
134
135 int64_t size_bytes() const override;
136 int32_t files_count() const override;
137 int64_t estimated_row_count() const override;
138
139 virtual ChangelogOperation operation() const = 0;
140
142 int32_t change_ordinal() const { return change_ordinal_; }
143
145 int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
146
148 const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }
149
150 protected:
151 int32_t change_ordinal_;
152 int64_t commit_snapshot_id_;
153 std::shared_ptr<DataFile> data_file_;
154 std::vector<std::shared_ptr<DataFile>> delete_files_;
155 std::shared_ptr<Expression> residual_filter_;
156};
157
176class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {
177 public:
178 using ChangelogScanTask::ChangelogScanTask;
179
180 ChangelogOperation operation() const override { return ChangelogOperation::kInsert; }
181
183 const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
184
188 const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
189 return delete_files_;
190 }
191};
192
206class ICEBERG_EXPORT DeletedDataFileScanTask : public ChangelogScanTask {
207 public:
208 using ChangelogScanTask::ChangelogScanTask;
209
210 ChangelogOperation operation() const override { return ChangelogOperation::kDelete; }
211
213 const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
214
219 const std::vector<std::shared_ptr<DataFile>>& existing_deletes() const {
220 return delete_files_;
221 }
222};
223
224namespace internal {
225
226// Internal table scan context used by different scan implementations.
228 std::optional<int64_t> snapshot_id;
229 std::shared_ptr<Expression> filter;
230 bool ignore_residuals{false};
231 bool case_sensitive{true};
232 bool return_column_stats{false};
233 std::unordered_set<int32_t> columns_to_keep_stats;
234 std::vector<std::string> selected_columns;
235 std::shared_ptr<Schema> projected_schema;
236 std::unordered_map<std::string, std::string> options;
237 bool from_snapshot_id_inclusive{false};
238 std::optional<int64_t> from_snapshot_id;
239 std::optional<int64_t> to_snapshot_id;
240 std::string branch{};
241 std::optional<int64_t> min_rows_requested;
242
243 // Validate the context parameters to see if they have conflicts.
244 [[nodiscard]] Status Validate() const;
245};
246
247} // namespace internal
248
249// Concept to check if a type is an incremental scan
250template <typename T>
251concept IsIncrementalScan = std::is_base_of_v<IncrementalScan<FileScanTask>, T> ||
252 std::is_base_of_v<IncrementalScan<ChangelogScanTask>, T>;
253
255template <typename ScanType = DataTableScan>
256class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector {
257 public:
261 static Result<std::unique_ptr<TableScanBuilder<ScanType>>> Make(
262 std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io);
263
268 TableScanBuilder& Option(std::string key, std::string value);
269
272 TableScanBuilder& Project(std::shared_ptr<Schema> schema);
273
277 TableScanBuilder& CaseSensitive(bool case_sensitive);
278
282 TableScanBuilder& IncludeColumnStats();
283
290 TableScanBuilder& IncludeColumnStats(const std::vector<std::string>& requested_columns);
291
298 TableScanBuilder& Select(const std::vector<std::string>& column_names);
299
302 TableScanBuilder& Filter(std::shared_ptr<Expression> filter);
303
305 TableScanBuilder& IgnoreResiduals();
306
314 TableScanBuilder& MinRowsRequested(int64_t num_rows);
315
319 TableScanBuilder& UseSnapshot(int64_t snapshot_id);
320
325 TableScanBuilder& UseRef(const std::string& ref);
326
332 TableScanBuilder& AsOfTime(int64_t timestamp_millis);
333
344 TableScanBuilder& FromSnapshot(int64_t from_snapshot_id, bool inclusive = false)
346
357 TableScanBuilder& FromSnapshot(const std::string& ref, bool inclusive = false)
359
368 TableScanBuilder& ToSnapshot(int64_t to_snapshot_id)
370
379 TableScanBuilder& ToSnapshot(const std::string& ref)
381
386 TableScanBuilder& UseBranch(const std::string& branch)
388
391 Result<std::unique_ptr<ScanType>> Build();
392
393 protected:
394 TableScanBuilder(std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io);
395
396 // Return the schema bound to the specified snapshot.
397 Result<std::reference_wrapper<const std::shared_ptr<Schema>>> ResolveSnapshotSchema();
398
399 std::shared_ptr<TableMetadata> metadata_;
400 std::shared_ptr<FileIO> io_;
402 std::shared_ptr<Schema> snapshot_schema_;
403};
404
406class ICEBERG_EXPORT TableScan {
407 public:
408 virtual ~TableScan();
409
411 const std::shared_ptr<TableMetadata>& metadata() const;
412
414 Result<std::shared_ptr<Snapshot>> snapshot() const;
415
417 Result<std::shared_ptr<Schema>> schema() const;
418
420 const internal::TableScanContext& context() const;
421
423 const std::shared_ptr<FileIO>& io() const;
424
426 const std::shared_ptr<Expression>& filter() const;
427
429 bool is_case_sensitive() const;
430
431 protected:
432 TableScan(std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
433 std::shared_ptr<FileIO> io, internal::TableScanContext context);
434
435 Result<std::reference_wrapper<const std::shared_ptr<Schema>>> ResolveProjectedSchema()
436 const;
437
438 virtual const std::vector<std::string>& ScanColumns() const;
439
440 const std::shared_ptr<TableMetadata> metadata_;
441 const std::shared_ptr<Schema> schema_;
442 const std::shared_ptr<FileIO> io_;
443 const internal::TableScanContext context_;
444 mutable std::shared_ptr<Schema> projected_schema_;
445};
446
448class ICEBERG_EXPORT DataTableScan : public TableScan {
449 public:
450 ~DataTableScan() override = default;
451
453 static Result<std::unique_ptr<DataTableScan>> Make(
454 std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
455 std::shared_ptr<FileIO> io, internal::TableScanContext context);
456
459 Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const;
460
461 protected:
462 using TableScan::TableScan;
463};
464
467template <typename ScanTaskType>
469 public:
470 ~IncrementalScan() override = default;
471
472 virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const = 0;
473
474 protected:
475 virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
476 std::optional<int64_t> from_snapshot_id_exclusive,
477 int64_t to_snapshot_id_inclusive) const = 0;
478
479 using TableScan::TableScan;
480
481 // Allow the free function ResolvePlanFiles to access protected members.
482 template <typename T>
483 friend Result<std::vector<std::shared_ptr<T>>> ResolvePlanFiles(
484 const IncrementalScan<T>& scan);
485};
486
488class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask> {
489 public:
491 static Result<std::unique_ptr<IncrementalAppendScan>> Make(
492 std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
493 std::shared_ptr<FileIO> io, internal::TableScanContext context);
494
495 ~IncrementalAppendScan() override = default;
496
497 Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const override;
498
499 protected:
500 Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
501 std::optional<int64_t> from_snapshot_id_exclusive,
502 int64_t to_snapshot_id_inclusive) const override;
503
504 using IncrementalScan::IncrementalScan;
505};
506
508class ICEBERG_EXPORT IncrementalChangelogScan
509 : public IncrementalScan<ChangelogScanTask> {
510 public:
512 static Result<std::unique_ptr<IncrementalChangelogScan>> Make(
513 std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
514 std::shared_ptr<FileIO> io, internal::TableScanContext context);
515
516 ~IncrementalChangelogScan() override = default;
517
518 Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles() const override;
519
520 protected:
521 Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
522 std::optional<int64_t> from_snapshot_id_exclusive,
523 int64_t to_snapshot_id_inclusive) const override;
524
525 using IncrementalScan::IncrementalScan;
526};
527
528extern template class ICEBERG_EXTERN_TEMPLATE_CLASS_EXPORT
529 TableScanBuilder<DataTableScan>;
530extern template class ICEBERG_EXTERN_TEMPLATE_CLASS_EXPORT
531 TableScanBuilder<IncrementalAppendScan>;
532extern template class ICEBERG_EXTERN_TEMPLATE_CLASS_EXPORT
533 TableScanBuilder<IncrementalChangelogScan>;
534
535} // namespace iceberg
A scan task for inserts generated by adding a data file to the table.
Definition table_scan.h:176
const std::shared_ptr< DataFile > & data_file() const
The data file containing the added rows.
Definition table_scan.h:183
const std::vector< std::shared_ptr< DataFile > > & delete_files() const
A list of delete files to apply when reading the data file in this task.
Definition table_scan.h:188
A scan task for reading changelog entries between snapshots.
Definition table_scan.h:114
int32_t change_ordinal() const
The position of this change in the changelog order (0-based).
Definition table_scan.h:142
int64_t commit_snapshot_id() const
The snapshot ID that committed this change.
Definition table_scan.h:145
ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id, std::shared_ptr< DataFile > data_file, std::vector< std::shared_ptr< DataFile > > delete_files={}, std::shared_ptr< Expression > residual_filter=nullptr)
Construct an AddedRowsScanTask.
Definition table_scan.h:123
Kind kind() const override
The kind of scan task.
Definition table_scan.h:133
const std::shared_ptr< Expression > & residual_filter() const
Residual filter to apply after reading.
Definition table_scan.h:148
A scan that reads data files and applies delete files to filter rows.
Definition table_scan.h:448
A scan task for deletes generated by removing a data file from the table.
Definition table_scan.h:206
const std::vector< std::shared_ptr< DataFile > > & existing_deletes() const
A list of previously added delete files to apply when reading the data file in this task.
Definition table_scan.h:219
const std::shared_ptr< DataFile > & data_file() const
The data file that was deleted.
Definition table_scan.h:213
Base class for collecting errors in the builder pattern.
Definition error_collector.h:93
Task representing a data file and its corresponding delete files.
Definition table_scan.h:63
Kind kind() const override
The kind of scan task.
Definition table_scan.h:85
const std::shared_ptr< DataFile > & data_file() const
The data file that should be read by this scan task.
Definition table_scan.h:75
const std::shared_ptr< Expression > & residual_filter() const
Residual filter to apply after reading.
Definition table_scan.h:83
const std::vector< std::shared_ptr< DataFile > > & delete_files() const
Delete files that apply to this data file.
Definition table_scan.h:78
A scan that reads data files added between snapshots (incremental appends).
Definition table_scan.h:488
A scan that reads changelog entries between snapshots.
Definition table_scan.h:509
A base template class for incremental scans that read changes between snapshots, and return scan task...
Definition table_scan.h:468
An abstract scan task.
Definition table_scan.h:40
virtual int32_t files_count() const =0
The number of files that should be read by this scan task.
virtual int64_t estimated_row_count() const =0
The number of rows that should be read by this scan task.
virtual int64_t size_bytes() const =0
The number of bytes that should be read by this scan task.
virtual Kind kind() const =0
The kind of scan task.
Builder class for creating TableScan instances.
Definition table_scan.h:256
Represents a configured scan operation on a table.
Definition table_scan.h:406
Definition table_scan.h:251
STL namespace.
Definition table_scan.h:227