iceberg-cpp
Loading...
Searching...
No Matches
retry_util.h
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#pragma once
21
22#include <chrono>
23#include <concepts>
24#include <cstdint>
25#include <functional>
26#include <initializer_list>
27#include <optional>
28#include <type_traits>
29#include <utility>
30#include <vector>
31
32#include "iceberg/iceberg_export.h"
33#include "iceberg/result.h"
34
35namespace iceberg {
36
37namespace detail {
38
39template <typename T>
40struct IsResult : std::false_type {};
41
42template <typename T>
43struct IsResult<Result<T>> : std::true_type {};
44
45template <typename T>
46concept ResultType = IsResult<std::remove_cvref_t<T>>::value;
47
48template <typename F>
49concept RetryTask = requires(F& f) {
50 { std::invoke(f) } -> ResultType;
51};
52
53template <typename F>
54using RetryTaskResult = std::remove_cvref_t<std::invoke_result_t<F&>>;
55
56} // namespace detail
57
59struct ICEBERG_EXPORT RetryConfig {
61 int32_t num_retries = 4;
63 int32_t min_wait_ms = 100;
65 int32_t max_wait_ms = 60 * 1000; // 1 minute
67 int32_t total_timeout_ms = 30 * 60 * 1000; // 30 minutes
69 double scale_factor = 2.0;
70};
71
76class ICEBERG_EXPORT RetryRunner {
77 public:
79 explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {}
80
88 RetryRunner& OnlyRetryOn(std::initializer_list<ErrorKind> error_kinds) {
89 retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn;
90 retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
91 return *this;
92 }
93
98 RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return OnlyRetryOn({error_kind}); }
99
107 RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) {
108 if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) {
109 return *this;
110 }
111
112 retry_policy_mode_ = RetryPolicyMode::kStopRetryOn;
113 retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
114 return *this;
115 }
116
121 RetryRunner& StopRetryOn(ErrorKind error_kind) { return StopRetryOn({error_kind}); }
122
129 template <typename F>
130 requires detail::RetryTask<F>
131 auto Run(F&& task, int32_t* attempt_counter = nullptr) -> detail::RetryTaskResult<F> {
132 using TaskResult = detail::RetryTaskResult<F>;
133
134 const auto validation = ValidateConfig();
135 if (!validation.has_value()) {
136 return TaskResult(std::unexpected(validation.error()));
137 }
138
139 const auto deadline = ComputeDeadline();
140 int32_t attempt = 0;
141 const int32_t max_attempts = config_.num_retries + 1;
142
143 while (true) {
144 ++attempt;
145 if (attempt_counter != nullptr) {
146 *attempt_counter = attempt;
147 }
148
149 auto result = std::invoke(task);
150 if (result.has_value()) {
151 return result;
152 }
153
154 if (!CanRetry(result.error().kind, attempt, max_attempts, deadline)) {
155 return result;
156 }
157
158 if (!WaitForNextAttempt(attempt, deadline)) {
159 return result;
160 }
161 }
162 }
163
164 private:
165 enum class RetryPolicyMode {
166 // No retry policy was selected; invalid when retries are enabled.
167 kUnset,
168 // Retry only errors listed in retry_error_kinds_.
169 kOnlyRetryOn,
170 // Retry all errors except those listed in retry_error_kinds_.
171 kStopRetryOn,
172 };
173
174 using Clock = std::chrono::steady_clock;
175 using Duration = std::chrono::milliseconds;
176 using TimePoint = Clock::time_point;
177
179 Status ValidateConfig() const;
180 std::optional<TimePoint> ComputeDeadline() const;
181 bool HasTimedOut(const std::optional<TimePoint>& deadline) const;
182
184 bool ShouldRetry(ErrorKind kind) const;
185 bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts,
186 const std::optional<TimePoint>& deadline) const;
187 std::optional<Duration> RetryDelayWithinBudget(
188 int32_t attempt, const std::optional<TimePoint>& deadline) const;
189 bool WaitForNextAttempt(int32_t attempt,
190 const std::optional<TimePoint>& deadline) const;
192 int32_t CalculateDelay(int32_t attempt) const;
193
194 RetryConfig config_;
195 RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset;
196 std::vector<ErrorKind> retry_error_kinds_;
197};
198
200ICEBERG_EXPORT inline RetryRunner MakeCommitRetryRunner(int32_t num_retries,
201 int32_t min_wait_ms,
202 int32_t max_wait_ms,
203 int32_t total_timeout_ms) {
204 return RetryRunner(RetryConfig{.num_retries = num_retries,
205 .min_wait_ms = min_wait_ms,
206 .max_wait_ms = max_wait_ms,
207 .total_timeout_ms = total_timeout_ms})
208 .OnlyRetryOn(ErrorKind::kCommitFailed);
209}
210
211} // namespace iceberg
Utility class for running tasks with retry logic.
Definition retry_util.h:76
RetryRunner(RetryConfig config={})
Construct a RetryRunner with the given configuration.
Definition retry_util.h:79
auto Run(F &&task, int32_t *attempt_counter=nullptr) -> detail::RetryTaskResult< F >
Run a task that returns a Result<T>
Definition retry_util.h:131
RetryRunner & StopRetryOn(std::initializer_list< ErrorKind > error_kinds)
Specify error types that should stop retries immediately.
Definition retry_util.h:107
RetryRunner & StopRetryOn(ErrorKind error_kind)
Specify a single error type that should stop retries immediately.
Definition retry_util.h:121
RetryRunner & OnlyRetryOn(std::initializer_list< ErrorKind > error_kinds)
Specify error types that should trigger a retry.
Definition retry_util.h:88
RetryRunner & OnlyRetryOn(ErrorKind error_kind)
Specify a single error type that should trigger a retry.
Definition retry_util.h:98
Definition retry_util.h:46
Definition retry_util.h:49
STL namespace.
Configuration for retry behavior.
Definition retry_util.h:59
Definition retry_util.h:40