iceberg-cpp
Loading...
Searching...
No Matches
delete_filter.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#pragma once
21
24
25#include <atomic>
26#include <cstdint>
27#include <functional>
28#include <memory>
29#include <mutex>
30#include <optional>
31#include <span>
32#include <string>
33#include <vector>
34
38#include "iceberg/iceberg_data_export.h"
39#include "iceberg/result.h"
41#include "iceberg/type_fwd.h"
42
43namespace iceberg {
44
46struct ICEBERG_DATA_EXPORT AliveRowSelection {
48 std::vector<int32_t> indices;
49
51 int64_t alive_count() const { return static_cast<int64_t>(indices.size()); }
52
53 bool empty() const { return indices.empty(); }
54};
55
57class ICEBERG_DATA_EXPORT DeleteCounter {
58 public:
59 void Increment(int64_t count = 1) {
60 count_.fetch_add(count, std::memory_order_relaxed);
61 }
62 int64_t Get() const { return count_.load(std::memory_order_relaxed); }
63
64 private:
65 std::atomic<int64_t> count_{0};
66};
67
69class ICEBERG_DATA_EXPORT DeleteFilter {
70 public:
78 SchemaField field;
79 SchemaField projection_field;
80 };
81
83 using FieldLookup = std::function<Result<std::optional<FieldLookupResult>>(int32_t)>;
84
90 static Result<FieldLookup> MakeFieldLookup(
91 std::shared_ptr<Schema> table_schema,
92 std::span<const std::shared_ptr<Schema>> schemas = {});
93
96 static Result<FieldLookup> MakeFieldLookup(
97 std::shared_ptr<TableMetadata> table_metadata);
98
109 static Result<std::unique_ptr<DeleteFilter>> Make(
110 std::string file_path, std::span<const std::shared_ptr<DataFile>> delete_files,
111 std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> requested_schema,
112 std::shared_ptr<FileIO> io, bool need_row_pos_col = true,
113 std::shared_ptr<DeleteCounter> counter = nullptr);
114
116 static Result<std::unique_ptr<DeleteFilter>> Make(
117 std::string file_path, std::span<const std::shared_ptr<DataFile>> delete_files,
118 std::shared_ptr<TableMetadata> table_metadata,
119 std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
120 bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter = nullptr);
121
123 static Result<std::unique_ptr<DeleteFilter>> Make(
124 std::string file_path, std::span<const std::shared_ptr<DataFile>> delete_files,
125 std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> requested_schema,
126 std::shared_ptr<FileIO> io, std::span<const std::shared_ptr<Schema>> schemas,
127 bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter = nullptr);
128
130 static Result<std::unique_ptr<DeleteFilter>> Make(
131 std::string file_path, std::span<const std::shared_ptr<DataFile>> delete_files,
132 std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
133 FieldLookup field_lookup, bool need_row_pos_col = true,
134 std::shared_ptr<DeleteCounter> counter = nullptr);
135
137
139 const std::shared_ptr<Schema>& RequiredSchema() const;
140
143 const std::shared_ptr<Schema>& ExpectedSchema() const;
144
149 void IncrementDeleteCount(int64_t count = 1);
150
157 Result<const PositionDeleteIndex*> DeletedRowPositions() const;
158
167 Result<std::function<Result<bool>(const StructLike&)>> EqDeletedRowFilter() const;
168
173 Result<std::function<Result<bool>(const StructLike&)>> FindEqualityDeleteRows() const;
174
180 Result<AliveRowSelection> ComputeAliveRows(const ArrowSchema& batch_schema,
181 const ArrowArray& batch) const;
182
183 bool HasPositionDeletes() const;
184 bool HasEqualityDeletes() const;
185
186 DeleteFilter(const DeleteFilter&) = delete;
187 DeleteFilter& operator=(const DeleteFilter&) = delete;
188
189 private:
190 struct EqDeleteGroup;
191
192 DeleteFilter(std::string file_path, std::shared_ptr<Schema> requested_schema,
193 std::shared_ptr<FileIO> io, FieldLookup field_lookup,
194 bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter);
195
196 Status Init(std::span<const std::shared_ptr<DataFile>> delete_files);
197 Result<std::shared_ptr<Schema>> ComputeRequiredSchema() const;
198 Status EnsurePositionDeletesLoaded() const;
199 Status EnsureEqualityDeletesLoaded() const;
200
201 const std::string file_path_;
202 std::vector<std::shared_ptr<DataFile>> pos_deletes_;
203 std::vector<std::shared_ptr<DataFile>> eq_deletes_;
204
205 std::shared_ptr<Schema> requested_schema_;
206 std::shared_ptr<Schema> required_schema_;
207 FieldLookup field_lookup_;
208
209 const bool need_row_pos_col_;
210 // Position of `_pos` in required_schema_ when existent
211 std::optional<size_t> pos_field_position_;
212 std::shared_ptr<DeleteCounter> counter_;
213
214 // TODO(gangwu): expose a factory hook (e.g. a std::function<DeleteLoader()> or a
215 // virtual newDeleteLoader()) so callers can inject a caching DeleteLoader (analogous to
216 // SparkDeleteFilter.CachingDeleteLoader in Java).
217 DeleteLoader delete_loader_;
218
219 mutable std::mutex pos_mutex_;
220 mutable bool pos_loaded_ = false;
221 mutable PositionDeleteIndex pos_index_;
222
223 mutable std::mutex eq_mutex_;
224 mutable bool eq_loaded_ = false;
225 mutable std::vector<std::unique_ptr<EqDeleteGroup>> eq_groups_;
226 mutable std::function<Result<bool>(const StructLike&)> eq_deleted_row_filter_cache_;
227};
228
229} // namespace iceberg
Counts rows removed by delete filters.
Definition delete_filter.h:57
Concrete batch-oriented delete filter for merge-on-read data batches.
Definition delete_filter.h:69
std::function< Result< std::optional< FieldLookupResult > >(int32_t)> FieldLookup
Lookup a field by ID, including fields from table schema fallbacks.
Definition delete_filter.h:83
Loads delete files and constructs in-memory delete indexes.
Definition delete_loader.h:36
Tracks deleted row positions using a bitmap.
Definition position_delete_index.h:38
A type combined with a name.
Definition schema_field.h:39
An immutable struct-like wrapper.
Definition struct_like.h:62
Definition arrow_c_data.h:57
Definition arrow_c_data.h:41
Result of ComputeAliveRows: indices of rows not matched by any delete.
Definition delete_filter.h:46
int64_t alive_count() const
Number of alive rows (convenience accessor to avoid size_t casts).
Definition delete_filter.h:51
std::vector< int32_t > indices
Zero-based row indices within the batch that are alive (not deleted).
Definition delete_filter.h:48
Field lookup output for current or fallback equality-delete fields.
Definition delete_filter.h:77