Skip to content

Commit cb45320

Browse files
Retry connections in JDBC catalog
1 parent 1757577 commit cb45320

6 files changed

Lines changed: 281 additions & 13 deletions

File tree

core/src/main/java/org/apache/iceberg/ClientPoolImpl.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,29 @@ public abstract class ClientPoolImpl<C, E extends Exception>
3434
private final Class<? extends E> reconnectExc;
3535
private final Object signal = new Object();
3636
private final boolean retryByDefault;
37+
private final int maxRetries;
38+
3739
private volatile int currentSize;
3840
private boolean closed;
3941

42+
private int connectionRetryWaitPeriodMs = 1000;
43+
4044
public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean retryByDefault) {
45+
this(poolSize, reconnectExc, retryByDefault, 1);
46+
}
47+
48+
public ClientPoolImpl(
49+
int poolSize,
50+
Class<? extends E> reconnectExc,
51+
boolean retryByDefault,
52+
int maxConnectionRetries) {
4153
this.poolSize = poolSize;
4254
this.reconnectExc = reconnectExc;
4355
this.clients = new ArrayDeque<>(poolSize);
4456
this.currentSize = 0;
4557
this.closed = false;
4658
this.retryByDefault = retryByDefault;
59+
this.maxRetries = maxConnectionRetries;
4760
}
4861

4962
@Override
@@ -56,26 +69,38 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
5669
C client = get();
5770
try {
5871
return action.run(client);
59-
6072
} catch (Exception exc) {
61-
if (retry && isConnectionException(exc)) {
62-
try {
63-
client = reconnect(client);
64-
} catch (Exception ignored) {
65-
// if reconnection throws any exception, rethrow the original failure
66-
throw reconnectExc.cast(exc);
67-
}
68-
69-
return action.run(client);
73+
if (!retry || !isConnectionException(exc)) {
74+
throw exc;
7075
}
7176

72-
throw exc;
77+
return retryAction(action, exc, client);
7378

7479
} finally {
7580
release(client);
7681
}
7782
}
7883

84+
private <R> R retryAction(Action<R, C, E> action, Exception originalFailure, C client)
85+
throws E, InterruptedException {
86+
int retryAttempts = 0;
87+
while (retryAttempts < maxRetries) {
88+
try {
89+
C reconnectedClient = reconnect(client);
90+
return action.run(reconnectedClient);
91+
} catch (Exception exc) {
92+
if (isConnectionException(exc)) {
93+
retryAttempts++;
94+
Thread.sleep(connectionRetryWaitPeriodMs);
95+
} else {
96+
throw reconnectExc.cast(originalFailure);
97+
}
98+
}
99+
}
100+
101+
throw reconnectExc.cast(originalFailure);
102+
}
103+
79104
protected abstract C newClient();
80105

81106
protected abstract C reconnect(C client);

core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.iceberg.hadoop.Configurable;
5656
import org.apache.iceberg.io.CloseableGroup;
5757
import org.apache.iceberg.io.FileIO;
58+
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5859
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
5960
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
6061
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -689,6 +690,11 @@ public void renameView(TableIdentifier from, TableIdentifier to) {
689690
}
690691
}
691692

693+
@VisibleForTesting
694+
JdbcClientPool connectionPool() {
695+
return connections;
696+
}
697+
692698
private int execute(String sql, String... args) {
693699
return execute(err -> {}, sql, args);
694700
}

core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,39 @@
2121
import java.sql.Connection;
2222
import java.sql.DriverManager;
2323
import java.sql.SQLException;
24-
import java.sql.SQLNonTransientConnectionException;
24+
import java.sql.SQLTransientException;
25+
import java.util.Arrays;
2526
import java.util.Map;
2627
import java.util.Properties;
28+
import java.util.Set;
29+
import java.util.stream.Collectors;
2730
import org.apache.iceberg.CatalogProperties;
2831
import org.apache.iceberg.ClientPoolImpl;
32+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
33+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
2934

3035
public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
3136

37+
/**
38+
* The following are common retryable SQLSTATE error codes which are generic across vendors.
39+
*
40+
* <ul>
41+
* <li>08000: Generic Connection Exception
42+
* <li>08003: Connection does not exist
43+
* <li>08006: Connection failure
44+
* <li>08007: Transaction resolution unknown
45+
* </ul>
46+
*
47+
* See https://en.wikipedia.org/wiki/SQLSTATE for more details.
48+
*/
49+
static final Set<String> COMMON_RETRYABLE_CONNECTION_SQL_STATES =
50+
ImmutableSet.of("08000", "08003", "08006", "08007");
51+
3252
private final String dbUrl;
3353
private final Map<String, String> properties;
3454

55+
private Set<String> retryableStatusCodes;
56+
3557
public JdbcClientPool(String dbUrl, Map<String, String> props) {
3658
this(
3759
Integer.parseInt(
@@ -43,8 +65,18 @@ public JdbcClientPool(String dbUrl, Map<String, String> props) {
4365
}
4466

4567
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
46-
super(poolSize, SQLNonTransientConnectionException.class, true);
68+
super(poolSize, SQLTransientException.class, true);
4769
properties = props;
70+
retryableStatusCodes = Sets.newHashSet();
71+
retryableStatusCodes.addAll(COMMON_RETRYABLE_CONNECTION_SQL_STATES);
72+
String configuredRetryableStatuses = props.get(JdbcUtil.RETRYABLE_STATUS_CODES);
73+
if (configuredRetryableStatuses != null) {
74+
retryableStatusCodes.addAll(
75+
Arrays.stream(configuredRetryableStatuses.split(","))
76+
.map(status -> status.replaceAll("\\s+", ""))
77+
.collect(Collectors.toSet()));
78+
}
79+
4880
this.dbUrl = dbUrl;
4981
}
5082

@@ -72,4 +104,11 @@ protected void close(Connection client) {
72104
throw new UncheckedSQLException(e, "Failed to close connection");
73105
}
74106
}
107+
108+
@Override
109+
protected boolean isConnectionException(Exception e) {
110+
return super.isConnectionException(e)
111+
|| (e instanceof SQLException
112+
&& retryableStatusCodes.contains(((SQLException) e).getSQLState()));
113+
}
75114
}

core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ final class JdbcUtil {
4343
static final String INIT_CATALOG_TABLES_PROPERTY =
4444
JdbcCatalog.PROPERTY_PREFIX + "init-catalog-tables";
4545

46+
static final String RETRYABLE_STATUS_CODES = "retryable_status_codes";
47+
4648
enum SchemaVersion {
4749
V0,
4850
V1
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
public class TestClientPoolImpl {
27+
28+
@Test
29+
public void testRetrySucceedsWithinMaxAttempts() throws Exception {
30+
int maxRetries = 5;
31+
int succeedAfterAttempts = 3;
32+
try (MockClientPoolImpl mockClientPool =
33+
new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) {
34+
int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts));
35+
assertThat(actions)
36+
.as("There should be exactly one successful action invocation")
37+
.isEqualTo(1);
38+
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1);
39+
}
40+
}
41+
42+
@Test
43+
public void testRetriesExhaustedAndSurfacesFailure() {
44+
int maxRetries = 3;
45+
int succeedAfterAttempts = 5;
46+
try (MockClientPoolImpl mockClientPool =
47+
new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) {
48+
assertThatThrownBy(
49+
() -> mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts)))
50+
.isInstanceOf(RetryableException.class);
51+
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(maxRetries);
52+
}
53+
}
54+
55+
@Test
56+
public void testNoRetryingNonRetryableException() {
57+
try (MockClientPoolImpl mockClientPool =
58+
new MockClientPoolImpl(2, RetryableException.class, true, 3)) {
59+
assertThatThrownBy(() -> mockClientPool.run(MockClient::failWithNonRetryable, true))
60+
.isInstanceOf(NonRetryableException.class);
61+
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
62+
}
63+
}
64+
65+
@Test
66+
public void testNoRetryingWhenDisabled() {
67+
try (MockClientPoolImpl mockClientPool =
68+
new MockClientPoolImpl(2, RetryableException.class, false, 3)) {
69+
assertThatThrownBy(() -> mockClientPool.run(client -> client.succeedAfter(3)))
70+
.isInstanceOf(RetryableException.class);
71+
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
72+
}
73+
}
74+
75+
static class RetryableException extends RuntimeException {}
76+
77+
static class NonRetryableException extends RuntimeException {}
78+
79+
static class MockClient {
80+
boolean closed = false;
81+
82+
int actions = 0;
83+
84+
int retryableFailures = 0;
85+
86+
public void close() {
87+
closed = true;
88+
}
89+
90+
public int successfulAction() {
91+
actions++;
92+
return actions;
93+
}
94+
95+
int succeedAfter(int succeedAfterAttempts) {
96+
if (retryableFailures == succeedAfterAttempts - 1) {
97+
return successfulAction();
98+
}
99+
100+
retryableFailures++;
101+
throw new RetryableException();
102+
}
103+
104+
int failWithNonRetryable() {
105+
throw new NonRetryableException();
106+
}
107+
}
108+
109+
static class MockClientPoolImpl extends ClientPoolImpl<MockClient, Exception> {
110+
111+
private int reconnectionAttempts;
112+
113+
MockClientPoolImpl(
114+
int poolSize,
115+
Class<? extends Exception> reconnectExc,
116+
boolean retryByDefault,
117+
int numRetries) {
118+
super(poolSize, reconnectExc, retryByDefault, numRetries);
119+
}
120+
121+
@Override
122+
protected MockClient newClient() {
123+
return new MockClient();
124+
}
125+
126+
@Override
127+
protected MockClient reconnect(MockClient client) {
128+
reconnectionAttempts++;
129+
return client;
130+
}
131+
132+
@Override
133+
protected void close(MockClient client) {
134+
client.close();
135+
}
136+
137+
int reconnectionAttempts() {
138+
return reconnectionAttempts;
139+
}
140+
}
141+
}

core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.sql.DatabaseMetaData;
3535
import java.sql.ResultSet;
3636
import java.sql.SQLException;
37+
import java.sql.SQLNonTransientConnectionException;
3738
import java.util.List;
3839
import java.util.Map;
3940
import java.util.Set;
@@ -208,6 +209,60 @@ public void testEnableInitCatalogTablesOverridesDefault() throws Exception {
208209
assertThat(catalogTablesExist(jdbcUrl)).isTrue();
209210
}
210211

212+
@Test
213+
public void testRetryingErrorCodesProperty() {
214+
Map<String, String> properties = Maps.newHashMap();
215+
properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.tableDir.toAbsolutePath().toString());
216+
properties.put(CatalogProperties.URI, "jdbc:sqlite:file::memory:?icebergDB");
217+
properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
218+
JdbcCatalog jdbcCatalog = new JdbcCatalog();
219+
jdbcCatalog.setConf(conf);
220+
jdbcCatalog.initialize("test_catalog_with_retryable_status_codes", properties);
221+
JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
222+
List<SQLException> expectedRetryableExceptions =
223+
Lists.newArrayList(
224+
new SQLException("operator_intervention", "57000"),
225+
new SQLException("cannot_connect_now", "57P03"),
226+
new SQLException("database_dropped", "57P04"));
227+
JdbcClientPool.COMMON_RETRYABLE_CONNECTION_SQL_STATES.forEach(
228+
code -> expectedRetryableExceptions.add(new SQLException("some failure", code)));
229+
230+
expectedRetryableExceptions.forEach(
231+
exception -> {
232+
assertThat(jdbcClientPool.isConnectionException(exception))
233+
.as(String.format("%s status should be retryable", exception.getSQLState()))
234+
.isTrue();
235+
});
236+
237+
// Test the same retryable status codes but with spaces in the configuration
238+
properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000, 57P03, 57P04");
239+
jdbcCatalog.initialize("test_catalog_with_retryable_status_codes_with_spaces", properties);
240+
JdbcClientPool updatedClientPool = jdbcCatalog.connectionPool();
241+
expectedRetryableExceptions.forEach(
242+
exception -> {
243+
assertThat(updatedClientPool.isConnectionException(exception))
244+
.as(String.format("%s status should be retryable", exception.getSQLState()))
245+
.isTrue();
246+
});
247+
}
248+
249+
@Test
250+
public void testSqlNonTransientExceptionNotRetryable() {
251+
Map<String, String> properties = Maps.newHashMap();
252+
properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.tableDir.toAbsolutePath().toString());
253+
properties.put(CatalogProperties.URI, "jdbc:sqlite:file::memory:?icebergDB");
254+
properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
255+
JdbcCatalog jdbcCatalog = new JdbcCatalog();
256+
jdbcCatalog.setConf(conf);
257+
jdbcCatalog.initialize("test_catalog_with_retryable_status_codes", properties);
258+
JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
259+
Assertions.assertThat(
260+
jdbcClientPool.isConnectionException(
261+
new SQLNonTransientConnectionException("Failed to authenticate")))
262+
.as("SQL Non Transient exception is not retryable")
263+
.isFalse();
264+
}
265+
211266
@Test
212267
public void testInitSchemaV0() {
213268
Map<String, String> properties = Maps.newHashMap();

0 commit comments

Comments
 (0)